2358 lines
96 KiB
Python
2358 lines
96 KiB
Python
# Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
BGP peer related classes and utils.
|
|
"""
|
|
|
|
from collections import namedtuple
|
|
import itertools
|
|
import logging
|
|
import socket
|
|
import time
|
|
import traceback
|
|
|
|
from os_ken.services.protocols.bgp.base import Activity
|
|
from os_ken.services.protocols.bgp.base import Sink
|
|
from os_ken.services.protocols.bgp.base import Source
|
|
from os_ken.services.protocols.bgp.base import SUPPORTED_GLOBAL_RF
|
|
from os_ken.services.protocols.bgp import constants as const
|
|
from os_ken.services.protocols.bgp.model import OutgoingRoute
|
|
from os_ken.services.protocols.bgp.model import SentRoute
|
|
from os_ken.services.protocols.bgp.info_base.base import PrefixFilter
|
|
from os_ken.services.protocols.bgp.info_base.base import AttributeMap
|
|
from os_ken.services.protocols.bgp.model import ReceivedRoute
|
|
from os_ken.services.protocols.bgp.net_ctrl import NET_CONTROLLER
|
|
from os_ken.services.protocols.bgp.rtconf.neighbors import NeighborConfListener
|
|
from os_ken.services.protocols.bgp.rtconf.neighbors import CONNECT_MODE_PASSIVE
|
|
from os_ken.services.protocols.bgp.signals.emit import BgpSignalBus
|
|
from os_ken.services.protocols.bgp.speaker import BgpProtocol
|
|
from os_ken.services.protocols.bgp.info_base.ipv4 import Ipv4Path
|
|
from os_ken.services.protocols.bgp.info_base.vpnv4 import Vpnv4Path
|
|
from os_ken.services.protocols.bgp.info_base.vpnv6 import Vpnv6Path
|
|
from os_ken.services.protocols.bgp.rtconf.vrfs import VRF_RF_IPV4, VRF_RF_IPV6
|
|
from os_ken.services.protocols.bgp.utils import bgp as bgp_utils
|
|
from os_ken.services.protocols.bgp.utils.evtlet import EventletIOFactory
|
|
from os_ken.services.protocols.bgp.utils import stats
|
|
from os_ken.services.protocols.bgp.utils.validation import is_valid_old_asn
|
|
|
|
from os_ken.lib.packet import bgp
|
|
|
|
from os_ken.lib.packet.bgp import RouteFamily
|
|
from os_ken.lib.packet.bgp import RF_IPv4_UC
|
|
from os_ken.lib.packet.bgp import RF_IPv6_UC
|
|
from os_ken.lib.packet.bgp import RF_IPv4_VPN
|
|
from os_ken.lib.packet.bgp import RF_IPv6_VPN
|
|
from os_ken.lib.packet.bgp import RF_IPv4_FLOWSPEC
|
|
from os_ken.lib.packet.bgp import RF_VPNv4_FLOWSPEC
|
|
from os_ken.lib.packet.bgp import RF_RTC_UC
|
|
from os_ken.lib.packet.bgp import get_rf
|
|
|
|
from os_ken.lib.packet.bgp import BGPOpen
|
|
from os_ken.lib.packet.bgp import BGPUpdate
|
|
from os_ken.lib.packet.bgp import BGPRouteRefresh
|
|
|
|
from os_ken.lib.packet.bgp import BGP_ERROR_CEASE
|
|
from os_ken.lib.packet.bgp import BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN
|
|
from os_ken.lib.packet.bgp import BGP_ERROR_SUB_CONNECTION_COLLISION_RESOLUTION
|
|
|
|
|
|
from os_ken.lib.packet.bgp import BGP_MSG_UPDATE
|
|
from os_ken.lib.packet.bgp import BGP_MSG_KEEPALIVE
|
|
from os_ken.lib.packet.bgp import BGP_MSG_ROUTE_REFRESH
|
|
|
|
from os_ken.lib.packet.bgp import BGPPathAttributeNextHop
|
|
from os_ken.lib.packet.bgp import BGPPathAttributeAsPath
|
|
from os_ken.lib.packet.bgp import BGPPathAttributeAs4Path
|
|
from os_ken.lib.packet.bgp import BGPPathAttributeLocalPref
|
|
from os_ken.lib.packet.bgp import BGPPathAttributeExtendedCommunities
|
|
from os_ken.lib.packet.bgp import BGPPathAttributeOriginatorId
|
|
from os_ken.lib.packet.bgp import BGPPathAttributeClusterList
|
|
from os_ken.lib.packet.bgp import BGPPathAttributeMpReachNLRI
|
|
from os_ken.lib.packet.bgp import BGPPathAttributeMpUnreachNLRI
|
|
from os_ken.lib.packet.bgp import BGPPathAttributeCommunities
|
|
from os_ken.lib.packet.bgp import BGPPathAttributeMultiExitDisc
|
|
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_ORIGIN
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_AGGREGATOR
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_AS4_AGGREGATOR
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_AS_PATH
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_AS4_PATH
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_NEXT_HOP
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_MP_REACH_NLRI
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_MP_UNREACH_NLRI
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_MULTI_EXIT_DISC
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_COMMUNITIES
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_ORIGINATOR_ID
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_CLUSTER_LIST
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYPE_EXTENDED_COMMUNITIES
|
|
from os_ken.lib.packet.bgp import BGP_ATTR_TYEP_PMSI_TUNNEL_ATTRIBUTE
|
|
|
|
from os_ken.lib.packet.bgp import BGPTwoOctetAsSpecificExtendedCommunity
|
|
from os_ken.lib.packet.bgp import BGPIPv4AddressSpecificExtendedCommunity
|
|
|
|
from os_ken.lib.packet import safi as subaddr_family
|
|
|
|
LOG = logging.getLogger('bgpspeaker.peer')
|
|
|
|
|
|
def is_valid_state(state):
|
|
"""Returns True if given state is a valid bgp finite state machine state.
|
|
"""
|
|
return state in const.BGP_FSM_VALID_STATES
|
|
|
|
|
|
class PeerRf(object):
|
|
"""State maintained per-RouteFamily for a Peer."""
|
|
|
|
def __init__(self, peer, route_family, enabled=False):
|
|
assert peer and route_family
|
|
|
|
self.enabled = enabled
|
|
|
|
# Back pointers.
|
|
self.peer = peer
|
|
self.rf = route_family
|
|
|
|
|
|
PeerCounterNames = namedtuple(
|
|
'PeerCounterNames',
|
|
('RECV_PREFIXES',
|
|
'RECV_UPDATES',
|
|
'SENT_UPDATES',
|
|
'RECV_NOTIFICATION',
|
|
'SENT_NOTIFICATION',
|
|
'SENT_REFRESH',
|
|
'RECV_REFRESH',
|
|
'FSM_ESTB_TRANSITIONS')
|
|
)(
|
|
'recv_prefixes',
|
|
'recv_updates',
|
|
'sent_updates',
|
|
'recv_notification',
|
|
'sent_notification',
|
|
'sent_refresh',
|
|
'recv_refresh',
|
|
'fms_established_transitions'
|
|
)
|
|
|
|
|
|
class PeerState(object):
|
|
"""A BGP neighbor state. Think of this class as of information and stats
|
|
container for Peer.
|
|
"""
|
|
|
|
def __init__(self, peer, signal_bus):
|
|
# Back pointer to peer whose stats this instances represents.
|
|
self.peer = peer
|
|
# Current state of BGP finite state machine.
|
|
self._bgp_state = const.BGP_FSM_IDLE
|
|
self._established_time = 0
|
|
self._last_bgp_error = None
|
|
self.counters = {
|
|
'recv_prefixes': 0,
|
|
'recv_updates': 0,
|
|
'sent_updates': 0,
|
|
'recv_notification': 0,
|
|
'sent_notification': 0,
|
|
'sent_refresh': 0,
|
|
'recv_refresh': 0,
|
|
'fms_established_transitions': 0,
|
|
}
|
|
self._signal_bus = signal_bus
|
|
|
|
# TODO(JK): refactor other counters to use signals also
|
|
self._signal_bus.register_listener(
|
|
('error', 'bgp', self.peer),
|
|
self._remember_last_bgp_error
|
|
)
|
|
|
|
self._signal_bus.register_listener(
|
|
BgpSignalBus.BGP_NOTIFICATION_RECEIVED + (self.peer,),
|
|
lambda _, msg: self.incr(PeerCounterNames.RECV_NOTIFICATION)
|
|
)
|
|
|
|
self._signal_bus.register_listener(
|
|
BgpSignalBus.BGP_NOTIFICATION_SENT + (self.peer,),
|
|
lambda _, msg: self.incr(PeerCounterNames.SENT_NOTIFICATION)
|
|
)
|
|
|
|
def _remember_last_bgp_error(self, identifier, data):
|
|
self._last_bgp_error = dict([(k, v)
|
|
for k, v in data.items()
|
|
if k != 'peer'])
|
|
|
|
@property
|
|
def recv_prefix(self):
|
|
# Number of prefixes received from peer.
|
|
return self.counters[PeerCounterNames.RECV_PREFIXES]
|
|
|
|
@property
|
|
def bgp_state(self):
|
|
return self._bgp_state
|
|
|
|
@bgp_state.setter
|
|
def bgp_state(self, new_state):
|
|
old_state = self._bgp_state
|
|
if old_state == new_state:
|
|
return
|
|
|
|
self._bgp_state = new_state
|
|
NET_CONTROLLER.send_rpc_notification(
|
|
'neighbor.state',
|
|
{
|
|
'ip_address': self.peer.ip_address,
|
|
'state': new_state
|
|
}
|
|
)
|
|
|
|
# transition to Established from another state
|
|
if new_state == const.BGP_FSM_ESTABLISHED:
|
|
self.incr(PeerCounterNames.FSM_ESTB_TRANSITIONS)
|
|
self._established_time = time.time()
|
|
self._signal_bus.adj_up(self.peer)
|
|
NET_CONTROLLER.send_rpc_notification(
|
|
'neighbor.up', {'ip_address': self.peer.ip_address}
|
|
)
|
|
# transition from Established to another state
|
|
elif old_state == const.BGP_FSM_ESTABLISHED:
|
|
self._established_time = 0
|
|
self._signal_bus.adj_down(self.peer)
|
|
NET_CONTROLLER.send_rpc_notification(
|
|
'neighbor.down', {'ip_address': self.peer.ip_address}
|
|
)
|
|
|
|
LOG.debug('Peer %s BGP FSM went from %s to %s',
|
|
self.peer.ip_address, old_state, self.bgp_state)
|
|
|
|
def incr(self, counter_name, incr_by=1):
|
|
if counter_name not in self.counters:
|
|
raise ValueError('Un-recognized counter name: %s' % counter_name)
|
|
counter = self.counters.setdefault(counter_name, 0)
|
|
counter += incr_by
|
|
self.counters[counter_name] = counter
|
|
|
|
def get_count(self, counter_name):
|
|
if counter_name not in self.counters:
|
|
raise ValueError('Un-recognized counter name: %s' % counter_name)
|
|
return self.counters.get(counter_name, 0)
|
|
|
|
@property
|
|
def total_msg_sent(self):
|
|
"""Returns total number of UPDATE, NOTIFICATION and ROUTE_REFRESH
|
|
message sent to this peer.
|
|
"""
|
|
return (self.get_count(PeerCounterNames.SENT_REFRESH) +
|
|
self.get_count(PeerCounterNames.SENT_UPDATES))
|
|
|
|
@property
|
|
def total_msg_recv(self):
|
|
"""Returns total number of UPDATE, NOTIFICATION and ROUTE_REFRESH
|
|
messages received from this peer.
|
|
"""
|
|
return (self.get_count(PeerCounterNames.RECV_UPDATES) +
|
|
self.get_count(PeerCounterNames.RECV_REFRESH) +
|
|
self.get_count(PeerCounterNames.RECV_NOTIFICATION))
|
|
|
|
def get_stats_summary_dict(self):
|
|
"""Returns basic stats.
|
|
|
|
Returns a `dict` with various counts and stats, see below.
|
|
"""
|
|
uptime = time.time() - self._established_time \
|
|
if self._established_time != 0 else -1
|
|
return {
|
|
stats.UPDATE_MSG_IN: self.get_count(PeerCounterNames.RECV_UPDATES),
|
|
stats.UPDATE_MSG_OUT: self.get_count(
|
|
PeerCounterNames.SENT_UPDATES
|
|
),
|
|
stats.TOTAL_MSG_IN: self.total_msg_recv,
|
|
stats.TOTAL_MSG_OUT: self.total_msg_sent,
|
|
stats.FMS_EST_TRANS: self.get_count(
|
|
PeerCounterNames.FSM_ESTB_TRANSITIONS
|
|
),
|
|
stats.UPTIME: uptime
|
|
}
|
|
|
|
|
|
class Peer(Source, Sink, NeighborConfListener, Activity):
|
|
"""A BGP neighbor/peer.
|
|
|
|
Listens on neighbor configuration changes and handles change events
|
|
appropriately. If peering is enabled tries 'actively'/'pro-actively' to
|
|
establish session with peer. Allows binding of `BgpProtocol` instances to
|
|
allow 'passive'/'reactive' establishment of bgp session with peer.
|
|
Maintains BGP state machine (may not be fully compliant with RFC). Handles
|
|
bgp UPDATE messages. Provides a queue to send update message to peer.
|
|
"""
|
|
|
|
RTC_EOR_TIMER_NAME = 'RTC_EOR_Timer'
|
|
|
|
def __init__(self, common_conf, neigh_conf,
|
|
core_service, signal_bus, peer_manager):
|
|
peer_activity_name = 'Peer: %s' % neigh_conf.ip_address
|
|
Activity.__init__(self, name=peer_activity_name)
|
|
Source.__init__(self, version_num=1)
|
|
Sink.__init__(self)
|
|
# Add listener for configuration changes.
|
|
NeighborConfListener.__init__(self, neigh_conf)
|
|
|
|
# Current configuration of this peer.
|
|
self._neigh_conf = neigh_conf
|
|
self._common_conf = common_conf
|
|
self._core_service = core_service
|
|
self._signal_bus = signal_bus
|
|
self._peer_manager = peer_manager
|
|
|
|
# Host Bind IP
|
|
self._host_bind_ip = None
|
|
self._host_bind_port = None
|
|
|
|
# TODO(PH): revisit maintaining state/stats information.
|
|
# Peer state.
|
|
self.state = PeerState(self, self._signal_bus)
|
|
self._periodic_stats_logger = \
|
|
self._create_timer('Peer State Summary Stats Timer',
|
|
stats.log,
|
|
stats_resource=self._neigh_conf,
|
|
stats_source=self.state.get_stats_summary_dict)
|
|
if self._neigh_conf.stats_log_enabled:
|
|
self._periodic_stats_logger.start(self._neigh_conf.stats_time)
|
|
|
|
# State per route family, {RouteFamily: PeerRf,}.
|
|
self.rf_state = {}
|
|
# Get vpnv4 route family settings.
|
|
prf = PeerRf(self, RF_IPv4_VPN,
|
|
enabled=self._neigh_conf.cap_mbgp_vpnv4)
|
|
self.rf_state[RF_IPv4_VPN] = prf
|
|
# Get vpnv6 route family settings.
|
|
prf = PeerRf(self, RF_IPv6_VPN, self._neigh_conf.cap_mbgp_vpnv6)
|
|
self.rf_state[RF_IPv6_VPN] = prf
|
|
|
|
# Bound protocol instance
|
|
self._protocol = None
|
|
|
|
# Setting this event starts the connect_loop loop again
|
|
# Clearing this event will stop the connect_loop loop
|
|
self._connect_retry_event = EventletIOFactory.create_custom_event()
|
|
|
|
# Reference to threads related to enhanced refresh timers.
|
|
self._refresh_stalepath_timer = None
|
|
self._refresh_max_eor_timer = None
|
|
|
|
# Latest valid Open Message
|
|
self.curr_open_msg = None
|
|
|
|
# RTC end-of-rib timer
|
|
self._rtc_eor_timer = None
|
|
self._sent_init_non_rtc_update = False
|
|
self._init_rtc_nlri_path = []
|
|
|
|
# in-bound filters
|
|
self._in_filters = self._neigh_conf.in_filter
|
|
|
|
# out-bound filters
|
|
self._out_filters = self._neigh_conf.out_filter
|
|
|
|
# Adj-rib-in
|
|
self._adj_rib_in = {}
|
|
|
|
# Adj-rib-out
|
|
self._adj_rib_out = {}
|
|
|
|
# attribute maps
|
|
self._attribute_maps = {}
|
|
|
|
@property
|
|
def remote_as(self):
|
|
return self._neigh_conf.remote_as
|
|
|
|
@property
|
|
def rtc_as(self):
|
|
return self._neigh_conf.rtc_as
|
|
|
|
@property
|
|
def ip_address(self):
|
|
return self._neigh_conf.ip_address
|
|
|
|
@property
|
|
def protocol(self):
|
|
return self._protocol
|
|
|
|
@property
|
|
def host_bind_ip(self):
|
|
return self._host_bind_ip
|
|
|
|
@property
|
|
def host_bind_port(self):
|
|
return self._host_bind_port
|
|
|
|
@property
|
|
def enabled(self):
|
|
return self._neigh_conf.enabled
|
|
|
|
@property
|
|
def med(self):
|
|
return self._neigh_conf.multi_exit_disc
|
|
|
|
@property
|
|
def local_as(self):
|
|
return self._neigh_conf.local_as
|
|
|
|
@property
|
|
def cap_four_octet_as_number(self):
|
|
return self._neigh_conf.cap_four_octet_as_number
|
|
|
|
@property
|
|
def in_filters(self):
|
|
return self._in_filters
|
|
|
|
@in_filters.setter
|
|
def in_filters(self, filters):
|
|
self._in_filters = [f.clone() for f in filters]
|
|
LOG.debug('set in-filter : %s', filters)
|
|
self.on_update_in_filter()
|
|
|
|
@property
|
|
def out_filters(self):
|
|
return self._out_filters
|
|
|
|
@out_filters.setter
|
|
def out_filters(self, filters):
|
|
self._out_filters = [f.clone() for f in filters]
|
|
LOG.debug('set out-filter : %s', filters)
|
|
self.on_update_out_filter()
|
|
|
|
@property
|
|
def adj_rib_in(self):
|
|
return self._adj_rib_in
|
|
|
|
@property
|
|
def adj_rib_out(self):
|
|
return self._adj_rib_out
|
|
|
|
@property
|
|
def is_route_server_client(self):
|
|
return self._neigh_conf.is_route_server_client
|
|
|
|
@property
|
|
def is_route_reflector_client(self):
|
|
return self._neigh_conf.is_route_reflector_client
|
|
|
|
@property
|
|
def check_first_as(self):
|
|
return self._neigh_conf.check_first_as
|
|
|
|
@property
|
|
def connect_mode(self):
|
|
return self._neigh_conf.connect_mode
|
|
|
|
@property
|
|
def attribute_maps(self):
|
|
return self._attribute_maps
|
|
|
|
@attribute_maps.setter
|
|
def attribute_maps(self, attribute_maps):
|
|
_attr_maps = {}
|
|
_attr_maps.setdefault(const.ATTR_MAPS_ORG_KEY, [])
|
|
|
|
# key is 'default' or rd_rf that represents RD and route_family
|
|
key = attribute_maps[const.ATTR_MAPS_LABEL_KEY]
|
|
at_maps = attribute_maps[const.ATTR_MAPS_VALUE]
|
|
|
|
for a in at_maps:
|
|
cloned = a.clone()
|
|
LOG.debug("AttributeMap attr_type: %s, attr_value: %s",
|
|
cloned.attr_type, cloned.attr_value)
|
|
attr_list = _attr_maps.setdefault(cloned.attr_type, [])
|
|
attr_list.append(cloned)
|
|
|
|
# preserve original order of attribute_maps
|
|
_attr_maps[const.ATTR_MAPS_ORG_KEY].append(cloned)
|
|
|
|
self._attribute_maps[key] = _attr_maps
|
|
self.on_update_attribute_maps()
|
|
|
|
def is_mpbgp_cap_valid(self, route_family):
|
|
if not self.in_established:
|
|
raise ValueError('Invalid request: Peer not in established state')
|
|
return self._protocol.is_mbgp_cap_valid(route_family)
|
|
|
|
def is_four_octet_as_number_cap_valid(self):
|
|
if not self.in_established:
|
|
raise ValueError('Invalid request: Peer not in established state')
|
|
return self._protocol.is_four_octet_as_number_cap_valid()
|
|
|
|
def is_ebgp_peer(self):
|
|
"""Returns *True* if this is a eBGP peer, else *False*."""
|
|
return self._common_conf.local_as != self._neigh_conf.remote_as
|
|
|
|
def in_established(self):
|
|
return self.state.bgp_state == const.BGP_FSM_ESTABLISHED
|
|
|
|
def in_idle(self):
|
|
return self.state.bgp_state == const.BGP_FSM_IDLE
|
|
|
|
def in_active(self):
|
|
return self.state.bgp_state == const.BGP_FSM_ACTIVE
|
|
|
|
def in_open_sent(self):
|
|
return self.state.bgp_state == const.BGP_FSM_OPEN_SENT
|
|
|
|
def in_open_confirm(self):
|
|
return self.state.bgp_state == const.BGP_FSM_OPEN_CONFIRM
|
|
|
|
def in_connect(self):
|
|
return self.state.bgp_state == const.BGP_FSM_CONNECT
|
|
|
|
def curr_fms_state(self):
|
|
return self.state.bgp_state
|
|
|
|
def is_mbgp_cap_valid(self, route_family):
|
|
if not self.in_established():
|
|
return False
|
|
|
|
return self._protocol.is_mbgp_cap_valid(route_family)
|
|
|
|
def on_chg_stats_time_conf_with_stats(self, evt):
|
|
# TODO(PH): provide implementation when updating neighbor is needed
|
|
pass
|
|
|
|
def on_chg_stats_enabled_conf_with_stats(self, evt):
|
|
# TODO(PH): provide implementation when updating neighbor is needed
|
|
pass
|
|
|
|
def on_update_enabled(self, conf_evt):
|
|
"""Implements neighbor configuration change listener.
|
|
"""
|
|
enabled = conf_evt.value
|
|
# If we do not have any protocol bound and configuration asks us to
|
|
# enable this peer, we try to establish connection again.
|
|
if enabled:
|
|
LOG.info('%s enabled', self)
|
|
if self._protocol and self._protocol.started:
|
|
LOG.error('Tried to enable neighbor that is already enabled')
|
|
else:
|
|
self.state.bgp_state = const.BGP_FSM_CONNECT
|
|
# Restart connect loop if not already running.
|
|
if not self._connect_retry_event.is_set():
|
|
self._connect_retry_event.set()
|
|
LOG.debug('Starting connect loop as neighbor is enabled.')
|
|
else:
|
|
LOG.info('%s disabled', self)
|
|
if self._protocol:
|
|
# Stopping protocol will eventually trigger connection_lost
|
|
# handler which will do some clean-up.
|
|
# But the greenlet that is in charge of the socket may be kill
|
|
# when we stop the protocol, hence we call connection_lost
|
|
# here as we triggered socket to close.
|
|
self._protocol.send_notification(
|
|
BGP_ERROR_CEASE,
|
|
BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN
|
|
)
|
|
self._protocol.stop()
|
|
self._protocol = None
|
|
self.state.bgp_state = const.BGP_FSM_IDLE
|
|
# If this peer is not enabled any-more we stop trying to make any
|
|
# connection.
|
|
LOG.debug('Disabling connect-retry as neighbor was disabled')
|
|
self._connect_retry_event.clear()
|
|
|
|
def on_update_med(self, conf_evt):
|
|
LOG.debug('on_update_med fired')
|
|
if self._protocol is not None and self._protocol.started:
|
|
negotiated_afs = self._protocol.negotiated_afs
|
|
for af in negotiated_afs:
|
|
self._fire_route_refresh(af)
|
|
|
|
def _on_update_connect_mode(self, mode):
|
|
if mode is not CONNECT_MODE_PASSIVE and \
|
|
'peer.connect_loop' not in self._child_thread_map:
|
|
LOG.debug("start connect loop. (mode: %s)", mode)
|
|
self._spawn('peer.connect_loop', self._connect_loop,
|
|
self._client_factory)
|
|
elif mode is CONNECT_MODE_PASSIVE:
|
|
LOG.debug("stop connect loop. (mode: %s)", mode)
|
|
self._stop_child_threads('peer.connect_loop')
|
|
|
|
def on_update_connect_mode(self, conf_evt):
|
|
self._on_update_connect_mode(conf_evt.value)
|
|
|
|
def _apply_filter(self, filters, path):
|
|
block = False
|
|
blocked_cause = None
|
|
|
|
for filter_ in filters:
|
|
if filter_.ROUTE_FAMILY != path.ROUTE_FAMILY:
|
|
continue
|
|
|
|
policy, is_matched = filter_.evaluate(path)
|
|
if policy == PrefixFilter.POLICY_PERMIT and is_matched:
|
|
block = False
|
|
break
|
|
elif policy == PrefixFilter.POLICY_DENY and is_matched:
|
|
block = True
|
|
blocked_cause = filter_.prefix + ' - DENY'
|
|
break
|
|
|
|
return block, blocked_cause
|
|
|
|
def _apply_in_filter(self, path):
|
|
return self._apply_filter(self._in_filters, path)
|
|
|
|
def _apply_out_filter(self, path):
|
|
return self._apply_filter(self._out_filters, path)
|
|
|
|
def on_update_in_filter(self):
|
|
LOG.debug('on_update_in_filter fired')
|
|
for received_path in self._adj_rib_in.values():
|
|
LOG.debug('received_path: %s', received_path)
|
|
path = received_path.path
|
|
nlri_str = path.nlri.formatted_nlri_str
|
|
block, blocked_reason = self._apply_in_filter(path)
|
|
if block == received_path.filtered:
|
|
LOG.debug('block situation not changed: %s', block)
|
|
continue
|
|
elif block:
|
|
# path wasn't blocked, but must be blocked by this update
|
|
path = path.clone(for_withdrawal=True)
|
|
LOG.debug('withdraw %s because of in filter update', nlri_str)
|
|
else:
|
|
# path was blocked, but mustn't be blocked by this update
|
|
LOG.debug('learn blocked %s because of in filter update',
|
|
nlri_str)
|
|
received_path.filtered = block
|
|
tm = self._core_service.table_manager
|
|
tm.learn_path(path)
|
|
|
|
def on_update_out_filter(self):
|
|
LOG.debug('on_update_out_filter fired')
|
|
for sent_path in self._adj_rib_out.values():
|
|
LOG.debug('sent_path: %s', sent_path)
|
|
path = sent_path.path
|
|
nlri_str = path.nlri.formatted_nlri_str
|
|
block, blocked_reason = self._apply_out_filter(path)
|
|
if block == sent_path.filtered:
|
|
LOG.debug('block situation not changed: %s', block)
|
|
continue
|
|
elif block:
|
|
# path wasn't blocked, but must be blocked by this update
|
|
withdraw_clone = path.clone(for_withdrawal=True)
|
|
outgoing_route = OutgoingRoute(withdraw_clone)
|
|
LOG.debug('send withdraw %s because of out filter update',
|
|
nlri_str)
|
|
else:
|
|
# path was blocked, but mustn't be blocked by this update
|
|
outgoing_route = OutgoingRoute(path)
|
|
LOG.debug('send blocked %s because of out filter update',
|
|
nlri_str)
|
|
sent_path.filtered = block
|
|
self.enque_outgoing_msg(outgoing_route)
|
|
|
|
def on_update_attribute_maps(self):
|
|
# resend sent_route in case of filter matching
|
|
LOG.debug('on_update_attribute_maps fired')
|
|
for sent_path in self._adj_rib_out.values():
|
|
LOG.debug('resend path: %s', sent_path)
|
|
path = sent_path.path
|
|
self.enque_outgoing_msg(OutgoingRoute(path))
|
|
|
|
def __str__(self):
|
|
return 'Peer(ip: %s, asn: %s)' % (self._neigh_conf.ip_address,
|
|
self._neigh_conf.remote_as)
|
|
|
|
def _run(self, client_factory):
|
|
LOG.debug('Started peer %s', self)
|
|
self._client_factory = client_factory
|
|
|
|
# Tries actively to establish session if CONNECT_MODE is not PASSIVE
|
|
self._on_update_connect_mode(self._neigh_conf.connect_mode)
|
|
|
|
# Start sink processing
|
|
self._process_outgoing_msg_list()
|
|
|
|
def _send_outgoing_route_refresh_msg(self, rr_msg):
|
|
"""Sends given message `rr_msg` to peer.
|
|
|
|
Parameters:
|
|
- rr_msg: (RouteRefresh) route refresh message to send to peer.
|
|
|
|
Update appropriate counters and set appropriate timers.
|
|
"""
|
|
assert rr_msg.type == BGP_MSG_ROUTE_REFRESH
|
|
self._protocol.send(rr_msg)
|
|
LOG.debug('RouteRefresh %s>> %s',
|
|
self._neigh_conf.ip_address, rr_msg)
|
|
# Collect update statistics for sent refresh request.
|
|
if rr_msg.demarcation == 0:
|
|
self.state.incr(PeerCounterNames.SENT_REFRESH)
|
|
# If SOR is sent, we set Max. EOR timer if needed.
|
|
elif (rr_msg.demarcation == 1 and
|
|
self._common_conf.refresh_max_eor_time != 0):
|
|
eor_timer = self._common_conf.refresh_max_eor_time
|
|
# Set timer to send EOR demarcation.
|
|
self._spawn_after('end-of-rib-timer', eor_timer,
|
|
self._enqueue_eor_msg, rr_msg)
|
|
LOG.debug('Enhanced RR max. EOR timer set.')
|
|
|
|
def _send_outgoing_route(self, outgoing_route):
|
|
"""Constructs `Update` message from given `outgoing_route` and sends
|
|
it to peer.
|
|
|
|
Also, checks if any policies prevent sending this message.
|
|
Populates Adj-RIB-out with corresponding `SentRoute`.
|
|
"""
|
|
|
|
path = outgoing_route.path
|
|
block, blocked_cause = self._apply_out_filter(path)
|
|
|
|
nlri_str = outgoing_route.path.nlri.formatted_nlri_str
|
|
sent_route = SentRoute(outgoing_route.path, self, block)
|
|
self._adj_rib_out[nlri_str] = sent_route
|
|
self._signal_bus.adj_rib_out_changed(self, sent_route)
|
|
|
|
# TODO(PH): optimized by sending several prefixes per update.
|
|
# Construct and send update message.
|
|
if not block:
|
|
update_msg = self._construct_update(outgoing_route)
|
|
self._protocol.send(update_msg)
|
|
# Collect update statistics.
|
|
self.state.incr(PeerCounterNames.SENT_UPDATES)
|
|
else:
|
|
LOG.debug('prefix : %s is not sent by filter : %s',
|
|
path.nlri, blocked_cause)
|
|
|
|
# We have to create sent_route for every OutgoingRoute which is
|
|
# not a withdraw or was for route-refresh msg.
|
|
if (not outgoing_route.path.is_withdraw and
|
|
not outgoing_route.for_route_refresh):
|
|
# Update the destination with new sent route.
|
|
tm = self._core_service.table_manager
|
|
tm.remember_sent_route(sent_route)
|
|
|
|
def _process_outgoing_msg_list(self):
|
|
while True:
|
|
outgoing_msg = None
|
|
|
|
if self._protocol is not None:
|
|
# We pick the first outgoing msg. available and send it.
|
|
outgoing_msg = self.outgoing_msg_list.pop_first()
|
|
|
|
# If we do not have any outgoing route, we wait.
|
|
if outgoing_msg is None:
|
|
self.outgoing_msg_event.clear()
|
|
self.outgoing_msg_event.wait()
|
|
continue
|
|
|
|
# Check currently supported out-going msgs.
|
|
assert isinstance(
|
|
outgoing_msg,
|
|
(BGPRouteRefresh, BGPUpdate, OutgoingRoute)
|
|
), ('Peer cannot process object: %s in its outgoing queue'
|
|
% outgoing_msg)
|
|
|
|
# Send msg. to peer.
|
|
if isinstance(outgoing_msg, BGPRouteRefresh):
|
|
self._send_outgoing_route_refresh_msg(outgoing_msg)
|
|
elif isinstance(outgoing_msg, OutgoingRoute):
|
|
self._send_outgoing_route(outgoing_msg)
|
|
|
|
# EOR are enqueued as plain Update messages.
|
|
elif isinstance(outgoing_msg, BGPUpdate):
|
|
self._protocol.send(outgoing_msg)
|
|
LOG.debug('Update %s>> %s', self._neigh_conf.ip_address,
|
|
outgoing_msg)
|
|
self.state.incr(PeerCounterNames.SENT_UPDATES)
|
|
|
|
def request_route_refresh(self, *route_families):
|
|
"""Request route refresh to peer for given `route_families`.
|
|
|
|
If no `route_families` are given, we make request for all supported
|
|
route families with this peer.
|
|
Parameters:
|
|
- `route_families`: list of route families to request route
|
|
refresh for.
|
|
|
|
If this peer is currently not in Established state, we raise exception.
|
|
If any of the `route_families` are invalid we raise exception.
|
|
"""
|
|
# If this peer has not established session yet
|
|
if not self.in_established:
|
|
raise ValueError('Peer not in established state to satisfy'
|
|
' this request.')
|
|
|
|
skip_validation = False
|
|
# If request is made for all supported route_families for current
|
|
# session, we collect all route_families for valid for current session.
|
|
if len(route_families) == 0:
|
|
route_families = []
|
|
# We skip validation of route families that we collect ourselves
|
|
# below.
|
|
skip_validation = True
|
|
for route_family in SUPPORTED_GLOBAL_RF:
|
|
if self.is_mbgp_cap_valid(route_family):
|
|
route_families.append(route_family)
|
|
|
|
for route_family in route_families:
|
|
if (skip_validation or
|
|
((route_family in SUPPORTED_GLOBAL_RF) and
|
|
# We ignore request for route_family not valid
|
|
# for current session.
|
|
self._protocol.is_mbgp_cap_valid(route_family))):
|
|
rr_req = BGPRouteRefresh(route_family.afi, route_family.safi)
|
|
self.enque_outgoing_msg(rr_req)
|
|
LOG.debug('Enqueued Route Refresh message to '
|
|
'peer %s for rf: %s', self, route_family)
|
|
|
|
def enque_end_of_rib(self, route_family):
|
|
# MP_UNREACH_NLRI Attribute.
|
|
mpunreach_attr = BGPPathAttributeMpUnreachNLRI(route_family.afi,
|
|
route_family.safi,
|
|
[])
|
|
update = BGPUpdate(path_attributes=[mpunreach_attr])
|
|
self.enque_outgoing_msg(update)
|
|
|
|
def _session_next_hop(self, path):
|
|
"""Returns nexthop address relevant to current session
|
|
|
|
Nexthop used can depend on capabilities of the session. If VPNv6
|
|
capability is active and session is on IPv4 connection, we have to use
|
|
IPv4 mapped IPv6 address. In other cases we can use connection end
|
|
point/local ip address.
|
|
"""
|
|
route_family = path.route_family
|
|
|
|
# By default we use BGPS's interface IP with this peer as next_hop.
|
|
if self._neigh_conf.next_hop:
|
|
next_hop = self._neigh_conf.next_hop
|
|
else:
|
|
next_hop = self.host_bind_ip
|
|
if route_family == RF_IPv6_VPN:
|
|
next_hop = self._ipv4_mapped_ipv6(next_hop)
|
|
|
|
return next_hop
|
|
|
|
@staticmethod
|
|
def _ipv4_mapped_ipv6(ipv4_address):
|
|
# Next hop ipv4_mapped ipv6
|
|
from netaddr import IPAddress
|
|
return str(IPAddress(ipv4_address).ipv6())
|
|
|
|
def _construct_as_path_attr(self, as_path_attr, as4_path_attr):
|
|
"""Marge AS_PATH and AS4_PATH attribute instances into
|
|
a single AS_PATH instance."""
|
|
|
|
def _listify(li):
|
|
"""Reconstruct AS_PATH list.
|
|
|
|
Example::
|
|
|
|
>>> _listify([[1, 2, 3], {4, 5}, [6, 7]])
|
|
[1, 2, 3, {4, 5}, 6, 7]
|
|
"""
|
|
lo = []
|
|
for l in li:
|
|
if isinstance(l, list):
|
|
lo.extend(l)
|
|
elif isinstance(l, set):
|
|
lo.append(l)
|
|
else:
|
|
pass
|
|
return lo
|
|
|
|
# If AS4_PATH attribute is None, returns the given AS_PATH attribute
|
|
if as4_path_attr is None:
|
|
return as_path_attr
|
|
|
|
# If AS_PATH is shorter than AS4_PATH, AS4_PATH should be ignored.
|
|
if as_path_attr.get_as_path_len() < as4_path_attr.get_as_path_len():
|
|
return as_path_attr
|
|
|
|
org_as_path_list = _listify(as_path_attr.path_seg_list)
|
|
as4_path_list = _listify(as4_path_attr.path_seg_list)
|
|
|
|
# Reverse to compare backward.
|
|
org_as_path_list.reverse()
|
|
as4_path_list.reverse()
|
|
|
|
new_as_path_list = []
|
|
tmp_list = []
|
|
for as_path, as4_path in itertools.zip_longest(org_as_path_list,
|
|
as4_path_list):
|
|
if as4_path is None:
|
|
if isinstance(as_path, int):
|
|
tmp_list.insert(0, as_path)
|
|
elif isinstance(as_path, set):
|
|
if tmp_list:
|
|
new_as_path_list.insert(0, tmp_list)
|
|
tmp_list = []
|
|
new_as_path_list.insert(0, as_path)
|
|
else:
|
|
pass
|
|
elif isinstance(as4_path, int):
|
|
tmp_list.insert(0, as4_path)
|
|
elif isinstance(as4_path, set):
|
|
if tmp_list:
|
|
new_as_path_list.insert(0, tmp_list)
|
|
tmp_list = []
|
|
new_as_path_list.insert(0, as4_path)
|
|
else:
|
|
pass
|
|
if tmp_list:
|
|
new_as_path_list.insert(0, tmp_list)
|
|
|
|
return bgp.BGPPathAttributeAsPath(new_as_path_list)
|
|
|
|
def _trans_as_path(self, as_path_list):
|
|
"""Translates Four-Octet AS number to AS_TRANS and separates
|
|
AS_PATH list into AS_PATH and AS4_PATH lists if needed.
|
|
|
|
If the neighbor does not support Four-Octet AS number,
|
|
this method constructs AS4_PATH list from AS_PATH list and swaps
|
|
non-mappable AS number in AS_PATH with AS_TRANS, then
|
|
returns AS_PATH list and AS4_PATH list.
|
|
If the neighbor supports Four-Octet AS number, returns
|
|
the given AS_PATH list and None.
|
|
"""
|
|
|
|
def _swap(n):
|
|
if is_valid_old_asn(n):
|
|
# mappable
|
|
return n
|
|
else:
|
|
# non-mappable
|
|
return bgp.AS_TRANS
|
|
|
|
# If the neighbor supports Four-Octet AS number, returns
|
|
# the given AS_PATH list and None.
|
|
if self.is_four_octet_as_number_cap_valid():
|
|
return as_path_list, None
|
|
|
|
# If the neighbor does not support Four-Octet AS number,
|
|
# constructs AS4_PATH list from AS_PATH list and swaps
|
|
# non-mappable AS number in AS_PATH with AS_TRANS.
|
|
else:
|
|
new_as_path_list = []
|
|
for as_path in as_path_list:
|
|
if isinstance(as_path, set):
|
|
path_set = set()
|
|
for as_num in as_path:
|
|
path_set.add(_swap(as_num))
|
|
new_as_path_list.append(path_set)
|
|
elif isinstance(as_path, list):
|
|
path_list = list()
|
|
for as_num in as_path:
|
|
path_list.append(_swap(as_num))
|
|
new_as_path_list.append(path_list)
|
|
else:
|
|
# Ignore invalid as_path type
|
|
pass
|
|
|
|
# If all of the AS_PATH list is composed of mappable four-octet
|
|
# AS numbers only, returns the given AS_PATH list
|
|
# Assumption: If the constructed AS_PATH list is the same as
|
|
# the given AS_PATH list, all AS number is mappable.
|
|
if as_path_list == new_as_path_list:
|
|
return as_path_list, None
|
|
|
|
return new_as_path_list, as_path_list
|
|
|
|
def _construct_update(self, outgoing_route):
|
|
"""Construct update message with Outgoing-routes path attribute
|
|
appropriately cloned/copied/updated.
|
|
"""
|
|
update = None
|
|
path = outgoing_route.path
|
|
# Get copy of path's path attributes.
|
|
pathattr_map = path.pathattr_map
|
|
new_pathattr = []
|
|
|
|
if path.is_withdraw:
|
|
if isinstance(path, Ipv4Path):
|
|
update = BGPUpdate(withdrawn_routes=[path.nlri])
|
|
return update
|
|
else:
|
|
mpunreach_attr = BGPPathAttributeMpUnreachNLRI(
|
|
path.route_family.afi, path.route_family.safi, [path.nlri]
|
|
)
|
|
new_pathattr.append(mpunreach_attr)
|
|
elif self.is_route_server_client:
|
|
nlri_list = [path.nlri]
|
|
new_pathattr.extend(pathattr_map.values())
|
|
else:
|
|
if self.is_route_reflector_client:
|
|
# Append ORIGINATOR_ID attribute if not already exist.
|
|
if BGP_ATTR_TYPE_ORIGINATOR_ID not in pathattr_map:
|
|
originator_id = path.source
|
|
if originator_id is None:
|
|
originator_id = self._common_conf.router_id
|
|
elif isinstance(path.source, Peer):
|
|
originator_id = path.source.ip_address
|
|
new_pathattr.append(
|
|
BGPPathAttributeOriginatorId(value=originator_id))
|
|
|
|
# Preppend own CLUSTER_ID into CLUSTER_LIST attribute if exist.
|
|
# Otherwise append CLUSTER_LIST attribute.
|
|
cluster_lst_attr = pathattr_map.get(BGP_ATTR_TYPE_CLUSTER_LIST)
|
|
if cluster_lst_attr:
|
|
cluster_list = list(cluster_lst_attr.value)
|
|
if self._common_conf.cluster_id not in cluster_list:
|
|
cluster_list.insert(0, self._common_conf.cluster_id)
|
|
new_pathattr.append(
|
|
BGPPathAttributeClusterList(cluster_list))
|
|
else:
|
|
new_pathattr.append(
|
|
BGPPathAttributeClusterList(
|
|
[self._common_conf.cluster_id]))
|
|
|
|
# Supported and un-supported/unknown attributes.
|
|
origin_attr = None
|
|
nexthop_attr = None
|
|
as_path_attr = None
|
|
as4_path_attr = None
|
|
aggregator_attr = None
|
|
as4_aggregator_attr = None
|
|
extcomm_attr = None
|
|
community_attr = None
|
|
localpref_attr = None
|
|
pmsi_tunnel_attr = None
|
|
unknown_opttrans_attrs = None
|
|
nlri_list = [path.nlri]
|
|
|
|
if path.route_family.safi in (subaddr_family.IP_FLOWSPEC,
|
|
subaddr_family.VPN_FLOWSPEC):
|
|
# Flow Specification does not have next_hop.
|
|
next_hop = []
|
|
elif self.is_ebgp_peer():
|
|
next_hop = self._session_next_hop(path)
|
|
if path.is_local() and path.has_nexthop():
|
|
next_hop = path.nexthop
|
|
else:
|
|
next_hop = path.nexthop
|
|
# RFC 4271 allows us to change next_hop
|
|
# if configured to announce its own ip address.
|
|
# Also if the BGP route is configured without next_hop,
|
|
# we use path._session_next_hop() as next_hop.
|
|
if (self._neigh_conf.is_next_hop_self
|
|
or (path.is_local() and not path.has_nexthop())):
|
|
next_hop = self._session_next_hop(path)
|
|
LOG.debug('using %s as a next_hop address instead'
|
|
' of path.nexthop %s', next_hop, path.nexthop)
|
|
|
|
nexthop_attr = BGPPathAttributeNextHop(next_hop)
|
|
assert nexthop_attr, 'Missing NEXTHOP mandatory attribute.'
|
|
|
|
if not isinstance(path, Ipv4Path):
|
|
# We construct mpreach-nlri attribute.
|
|
mpnlri_attr = BGPPathAttributeMpReachNLRI(
|
|
path.route_family.afi,
|
|
path.route_family.safi,
|
|
next_hop,
|
|
nlri_list
|
|
)
|
|
|
|
# ORIGIN Attribute.
|
|
# According to RFC this attribute value SHOULD NOT be changed by
|
|
# any other speaker.
|
|
origin_attr = pathattr_map.get(BGP_ATTR_TYPE_ORIGIN)
|
|
assert origin_attr, 'Missing ORIGIN mandatory attribute.'
|
|
|
|
# AS_PATH Attribute.
|
|
# Construct AS-path-attr using paths AS_PATH attr. with local AS as
|
|
# first item.
|
|
path_aspath = pathattr_map.get(BGP_ATTR_TYPE_AS_PATH)
|
|
assert path_aspath, 'Missing AS_PATH mandatory attribute.'
|
|
# Deep copy AS_PATH attr value
|
|
as_path_list = path_aspath.path_seg_list
|
|
# If this is a iBGP peer.
|
|
if not self.is_ebgp_peer():
|
|
# When a given BGP speaker advertises the route to an internal
|
|
# peer, the advertising speaker SHALL NOT modify the AS_PATH
|
|
# attribute associated with the route.
|
|
pass
|
|
else:
|
|
# When a given BGP speaker advertises the route to an external
|
|
# peer, the advertising speaker updates the AS_PATH attribute
|
|
# as follows:
|
|
# 1) if the first path segment of the AS_PATH is of type
|
|
# AS_SEQUENCE, the local system prepends its own AS num as
|
|
# the last element of the sequence (put it in the left-most
|
|
# position with respect to the position of octets in the
|
|
# protocol message). If the act of prepending will cause an
|
|
# overflow in the AS_PATH segment (i.e., more than 255
|
|
# ASes), it SHOULD prepend a new segment of type AS_SEQUENCE
|
|
# and prepend its own AS number to this new segment.
|
|
#
|
|
# 2) if the first path segment of the AS_PATH is of type AS_SET
|
|
# , the local system prepends a new path segment of type
|
|
# AS_SEQUENCE to the AS_PATH, including its own AS number in
|
|
# that segment.
|
|
#
|
|
# 3) if the AS_PATH is empty, the local system creates a path
|
|
# segment of type AS_SEQUENCE, places its own AS into that
|
|
# segment, and places that segment into the AS_PATH.
|
|
if (len(as_path_list) > 0 and
|
|
isinstance(as_path_list[0], list) and
|
|
len(as_path_list[0]) < 255):
|
|
as_path_list[0].insert(0, self.local_as)
|
|
else:
|
|
as_path_list.insert(0, [self.local_as])
|
|
# Construct AS4_PATH list from AS_PATH list and swap
|
|
# non-mappable AS number with AS_TRANS in AS_PATH.
|
|
as_path_list, as4_path_list = self._trans_as_path(
|
|
as_path_list)
|
|
# If the neighbor supports Four-Octet AS number, send AS_PATH
|
|
# in Four-Octet.
|
|
if self.is_four_octet_as_number_cap_valid():
|
|
as_path_attr = BGPPathAttributeAsPath(
|
|
as_path_list, as_pack_str='!I') # specify Four-Octet.
|
|
# Otherwise, send AS_PATH in Two-Octet.
|
|
else:
|
|
as_path_attr = BGPPathAttributeAsPath(as_path_list)
|
|
# If needed, send AS4_PATH attribute.
|
|
if as4_path_list:
|
|
as4_path_attr = BGPPathAttributeAs4Path(as4_path_list)
|
|
|
|
# AGGREGATOR Attribute.
|
|
aggregator_attr = pathattr_map.get(BGP_ATTR_TYPE_AGGREGATOR)
|
|
# If the neighbor does not support Four-Octet AS number,
|
|
# swap non-mappable AS number with AS_TRANS.
|
|
if (aggregator_attr and
|
|
not self.is_four_octet_as_number_cap_valid()):
|
|
# If AS number of AGGREGATOR is Four-Octet AS number,
|
|
# swap with AS_TRANS, else do not.
|
|
aggregator_as_number = aggregator_attr.as_number
|
|
if not is_valid_old_asn(aggregator_as_number):
|
|
aggregator_attr = bgp.BGPPathAttributeAggregator(
|
|
bgp.AS_TRANS, aggregator_attr.addr)
|
|
as4_aggregator_attr = bgp.BGPPathAttributeAs4Aggregator(
|
|
aggregator_as_number, aggregator_attr.addr)
|
|
|
|
# MULTI_EXIT_DISC Attribute.
|
|
# For eBGP session we can send multi-exit-disc if configured.
|
|
multi_exit_disc = None
|
|
if self.is_ebgp_peer():
|
|
if self._neigh_conf.multi_exit_disc:
|
|
multi_exit_disc = BGPPathAttributeMultiExitDisc(
|
|
self._neigh_conf.multi_exit_disc
|
|
)
|
|
else:
|
|
pass
|
|
if not self.is_ebgp_peer():
|
|
multi_exit_disc = pathattr_map.get(
|
|
BGP_ATTR_TYPE_MULTI_EXIT_DISC)
|
|
|
|
# LOCAL_PREF Attribute.
|
|
if not self.is_ebgp_peer():
|
|
# For iBGP peers we are required to send local-pref attribute
|
|
# for connected or local prefixes. We check if the path matches
|
|
# attribute_maps and set local-pref value.
|
|
# If the path doesn't match, we set default local-pref given
|
|
# from the user. The default value is 100.
|
|
localpref_attr = BGPPathAttributeLocalPref(
|
|
self._common_conf.local_pref)
|
|
key = const.ATTR_MAPS_LABEL_DEFAULT
|
|
|
|
if isinstance(path, (Vpnv4Path, Vpnv6Path)):
|
|
nlri = nlri_list[0]
|
|
rf = VRF_RF_IPV4 if isinstance(path, Vpnv4Path)\
|
|
else VRF_RF_IPV6
|
|
key = ':'.join([nlri.route_dist, rf])
|
|
|
|
attr_type = AttributeMap.ATTR_LOCAL_PREF
|
|
at_maps = self._attribute_maps.get(key, {})
|
|
result = self._lookup_attribute_map(at_maps, attr_type, path)
|
|
if result:
|
|
localpref_attr = result
|
|
|
|
# COMMUNITY Attribute.
|
|
community_attr = pathattr_map.get(BGP_ATTR_TYPE_COMMUNITIES)
|
|
|
|
# EXTENDED COMMUNITY Attribute.
|
|
# Construct ExtCommunity path-attr based on given.
|
|
path_extcomm_attr = pathattr_map.get(
|
|
BGP_ATTR_TYPE_EXTENDED_COMMUNITIES
|
|
)
|
|
if path_extcomm_attr:
|
|
# SOO list can be configured per VRF and/or per Neighbor.
|
|
# NeighborConf has this setting we add this to existing list.
|
|
communities = path_extcomm_attr.communities
|
|
if self._neigh_conf.soo_list:
|
|
# construct extended community
|
|
soo_list = self._neigh_conf.soo_list
|
|
subtype = 0x03
|
|
for soo in soo_list:
|
|
first, second = soo.split(':')
|
|
if '.' in first:
|
|
c = BGPIPv4AddressSpecificExtendedCommunity(
|
|
subtype=subtype,
|
|
ipv4_address=first,
|
|
local_administrator=int(second))
|
|
else:
|
|
c = BGPTwoOctetAsSpecificExtendedCommunity(
|
|
subtype=subtype,
|
|
as_number=int(first),
|
|
local_administrator=int(second))
|
|
communities.append(c)
|
|
|
|
extcomm_attr = BGPPathAttributeExtendedCommunities(
|
|
communities=communities
|
|
)
|
|
|
|
pmsi_tunnel_attr = pathattr_map.get(
|
|
BGP_ATTR_TYEP_PMSI_TUNNEL_ATTRIBUTE
|
|
)
|
|
|
|
# UNKNOWN Attributes.
|
|
# Get optional transitive path attributes
|
|
unknown_opttrans_attrs = bgp_utils.get_unknown_opttrans_attr(path)
|
|
|
|
# Ordering path attributes according to type as RFC says. We set
|
|
# MPReachNLRI first as advised by experts as a new trend in BGP
|
|
# implementation.
|
|
if isinstance(path, Ipv4Path):
|
|
new_pathattr.append(nexthop_attr)
|
|
else:
|
|
new_pathattr.append(mpnlri_attr)
|
|
|
|
new_pathattr.append(origin_attr)
|
|
new_pathattr.append(as_path_attr)
|
|
if as4_path_attr:
|
|
new_pathattr.append(as4_path_attr)
|
|
if aggregator_attr:
|
|
new_pathattr.append(aggregator_attr)
|
|
if as4_aggregator_attr:
|
|
new_pathattr.append(as4_aggregator_attr)
|
|
if multi_exit_disc:
|
|
new_pathattr.append(multi_exit_disc)
|
|
if localpref_attr:
|
|
new_pathattr.append(localpref_attr)
|
|
if community_attr:
|
|
new_pathattr.append(community_attr)
|
|
if extcomm_attr:
|
|
new_pathattr.append(extcomm_attr)
|
|
if pmsi_tunnel_attr:
|
|
new_pathattr.append(pmsi_tunnel_attr)
|
|
if unknown_opttrans_attrs:
|
|
new_pathattr.extend(unknown_opttrans_attrs.values())
|
|
|
|
if isinstance(path, Ipv4Path):
|
|
update = BGPUpdate(path_attributes=new_pathattr,
|
|
nlri=nlri_list)
|
|
else:
|
|
update = BGPUpdate(path_attributes=new_pathattr)
|
|
return update
|
|
|
|
def _connect_loop(self, client_factory):
|
|
"""In the current greenlet we try to establish connection with peer.
|
|
|
|
This greenlet will spin another greenlet to handle incoming data
|
|
from the peer once connection is established.
|
|
"""
|
|
# If current configuration allow, enable active session establishment.
|
|
if self._neigh_conf.enabled:
|
|
self._connect_retry_event.set()
|
|
|
|
while True:
|
|
self._connect_retry_event.wait()
|
|
|
|
# Reconnecting immediately after closing connection may be not very
|
|
# well seen by some peers (ALU?)
|
|
self.pause(1.0)
|
|
if self.state.bgp_state in \
|
|
(const.BGP_FSM_IDLE, const.BGP_FSM_ACTIVE):
|
|
|
|
# Check if we have to stop or retry
|
|
self.state.bgp_state = const.BGP_FSM_CONNECT
|
|
# If we have specific host interface to bind to, we will do so
|
|
# else we will bind to system default.
|
|
if self._neigh_conf.host_bind_ip and \
|
|
self._neigh_conf.host_bind_port:
|
|
bind_addr = (self._neigh_conf.host_bind_ip,
|
|
self._neigh_conf.host_bind_port)
|
|
else:
|
|
bind_addr = None
|
|
peer_address = (self._neigh_conf.ip_address,
|
|
self._neigh_conf.port)
|
|
|
|
if bind_addr:
|
|
LOG.debug('%s trying to connect from'
|
|
'%s to %s', self, bind_addr, peer_address)
|
|
else:
|
|
LOG.debug('%s trying to connect to %s', self, peer_address)
|
|
tcp_conn_timeout = self._common_conf.tcp_conn_timeout
|
|
try:
|
|
password = self._neigh_conf.password
|
|
self._connect_tcp(peer_address,
|
|
client_factory,
|
|
time_out=tcp_conn_timeout,
|
|
bind_address=bind_addr,
|
|
password=password)
|
|
except socket.error:
|
|
self.state.bgp_state = const.BGP_FSM_ACTIVE
|
|
if LOG.isEnabledFor(logging.DEBUG):
|
|
LOG.debug('Socket could not be created in time'
|
|
' (%s secs), reason %s', tcp_conn_timeout,
|
|
traceback.format_exc())
|
|
LOG.info('Will try to reconnect to %s after %s secs: %s',
|
|
self._neigh_conf.ip_address,
|
|
self._common_conf.bgp_conn_retry_time,
|
|
self._connect_retry_event.is_set())
|
|
|
|
self.pause(self._common_conf.bgp_conn_retry_time)
|
|
|
|
def _set_protocol(self, proto):
|
|
self._protocol = proto
|
|
|
|
# Update state attributes
|
|
self.state.peer_ip, self.state.peer_port = self._protocol._remotename
|
|
self.state.local_ip, self.state.local_port = self._protocol._localname
|
|
# self.state.bgp_state = self._protocol.state
|
|
# Stop connect_loop retry timer as we are now connected
|
|
if self._protocol and self._connect_retry_event.is_set():
|
|
self._connect_retry_event.clear()
|
|
LOG.debug('Connect retry event for %s is cleared', self)
|
|
|
|
if self._protocol and self.outgoing_msg_event.is_set():
|
|
# Start processing sink.
|
|
self.outgoing_msg_event.set()
|
|
LOG.debug('Processing of outgoing msg. started for %s.', self)
|
|
|
|
def _send_collision_err_and_stop(self, protocol):
|
|
code = BGP_ERROR_CEASE
|
|
subcode = BGP_ERROR_SUB_CONNECTION_COLLISION_RESOLUTION
|
|
self._signal_bus.bgp_error(self, code, subcode, None)
|
|
protocol.send_notification(code, subcode)
|
|
protocol.stop()
|
|
|
|
def bind_protocol(self, proto):
|
|
"""Tries to bind given protocol to this peer.
|
|
|
|
Should only be called by `proto` trying to bind.
|
|
Once bound this protocol instance will be used to communicate with
|
|
peer. If another protocol is already bound, connection collision
|
|
resolution takes place.
|
|
"""
|
|
LOG.debug('Trying to bind protocol %s to peer %s', proto, self)
|
|
# Validate input.
|
|
if not isinstance(proto, BgpProtocol):
|
|
raise ValueError('Currently only supports valid instances of'
|
|
' `BgpProtocol`')
|
|
|
|
if proto.state != const.BGP_FSM_OPEN_CONFIRM:
|
|
raise ValueError('Only protocols in OpenConfirm state can be'
|
|
' bound')
|
|
|
|
# If we are not bound to any protocol
|
|
is_bound = False
|
|
if not self._protocol:
|
|
self._set_protocol(proto)
|
|
is_bound = True
|
|
else:
|
|
# If existing protocol is already established, we raise exception.
|
|
if self.state.bgp_state != const.BGP_FSM_IDLE:
|
|
LOG.debug('Currently in %s state, hence will send collision'
|
|
' Notification to close this protocol.',
|
|
self.state.bgp_state)
|
|
self._send_collision_err_and_stop(proto)
|
|
return
|
|
|
|
# If we have a collision that need to be resolved
|
|
assert proto.is_colliding(self._protocol), \
|
|
('Tried to bind second protocol that is not colliding with '
|
|
'first/bound protocol')
|
|
LOG.debug('Currently have one protocol in %s state and '
|
|
'another protocol in %s state',
|
|
self._protocol.state, proto.state)
|
|
# Protocol that is already bound
|
|
first_protocol = self._protocol
|
|
assert ((first_protocol.is_reactive and not proto.is_reactive) or
|
|
(proto.is_reactive and not first_protocol.is_reactive))
|
|
# Connection initiated by peer.
|
|
reactive_proto = None
|
|
# Connection initiated locally.
|
|
proactive_proto = None
|
|
# Identify which protocol was initiated by which peer.
|
|
if proto.is_reactive:
|
|
reactive_proto = proto
|
|
proactive_proto = self._protocol
|
|
else:
|
|
reactive_proto = self._protocol
|
|
proactive_proto = proto
|
|
|
|
LOG.debug('Pro-active/Active protocol %s', proactive_proto)
|
|
# We compare bgp local and remote router id and keep the protocol
|
|
# that was initiated by peer with highest id.
|
|
if proto.is_local_router_id_greater():
|
|
self._set_protocol(proactive_proto)
|
|
else:
|
|
self._set_protocol(reactive_proto)
|
|
|
|
if self._protocol is not proto:
|
|
# If new proto did not win collision we return False to
|
|
# indicate this.
|
|
is_bound = False
|
|
else:
|
|
# If first protocol did not win collision resolution we
|
|
# we send notification to peer and stop it
|
|
self._send_collision_err_and_stop(first_protocol)
|
|
is_bound = True
|
|
|
|
return is_bound
|
|
|
|
def create_open_msg(self):
|
|
"""Create `Open` message using current settings.
|
|
|
|
Current setting include capabilities, timers and ids.
|
|
"""
|
|
asnum = self.local_as
|
|
# If local AS number is not Two-Octet AS number, swaps with AS_TRANS.
|
|
if not is_valid_old_asn(asnum):
|
|
asnum = bgp.AS_TRANS
|
|
bgpid = self._common_conf.router_id
|
|
holdtime = self._neigh_conf.hold_time
|
|
|
|
def flatten(L):
|
|
if isinstance(L, list):
|
|
for i in range(len(L)):
|
|
for e in flatten(L[i]):
|
|
yield e
|
|
else:
|
|
yield L
|
|
opts = list(flatten(
|
|
list(self._neigh_conf.get_configured_capabilities().values())))
|
|
open_msg = BGPOpen(
|
|
my_as=asnum,
|
|
bgp_identifier=bgpid,
|
|
version=const.BGP_VERSION_NUM,
|
|
hold_time=holdtime,
|
|
opt_param=opts
|
|
)
|
|
return open_msg
|
|
|
|
def _validate_update_msg(self, update_msg):
|
|
"""Validate update message as per RFC.
|
|
|
|
Here we validate the message after it has been parsed. Message
|
|
has already been validated against some errors inside parsing
|
|
library.
|
|
"""
|
|
# TODO(PH): finish providing implementation, currently low priority
|
|
assert update_msg.type == BGP_MSG_UPDATE
|
|
# An UPDATE message may be received only in the Established state.
|
|
# Receiving an UPDATE message in any other state is an error.
|
|
if self.state.bgp_state != const.BGP_FSM_ESTABLISHED:
|
|
LOG.error('Received UPDATE message when not in ESTABLISHED'
|
|
' state.')
|
|
raise bgp.FiniteStateMachineError()
|
|
|
|
mp_reach_attr = update_msg.get_path_attr(
|
|
BGP_ATTR_TYPE_MP_REACH_NLRI
|
|
)
|
|
mp_unreach_attr = update_msg.get_path_attr(
|
|
BGP_ATTR_TYPE_MP_UNREACH_NLRI
|
|
)
|
|
|
|
# non-MPBGP Update msg.
|
|
if not (mp_reach_attr or mp_unreach_attr):
|
|
if not self.is_mpbgp_cap_valid(RF_IPv4_UC):
|
|
LOG.error('Got UPDATE message with un-available'
|
|
' afi/safi %s', RF_IPv4_UC)
|
|
nlri_list = update_msg.nlri
|
|
if len(nlri_list) > 0:
|
|
# Check for missing well-known mandatory attributes.
|
|
aspath = update_msg.get_path_attr(BGP_ATTR_TYPE_AS_PATH)
|
|
if not aspath:
|
|
raise bgp.MissingWellKnown(
|
|
BGP_ATTR_TYPE_AS_PATH)
|
|
|
|
if (self.check_first_as and self.is_ebgp_peer() and
|
|
not aspath.has_matching_leftmost(self.remote_as)):
|
|
LOG.error('First AS check fails. Raise appropriate'
|
|
' exception.')
|
|
raise bgp.MalformedAsPath()
|
|
|
|
origin = update_msg.get_path_attr(BGP_ATTR_TYPE_ORIGIN)
|
|
if not origin:
|
|
raise bgp.MissingWellKnown(BGP_ATTR_TYPE_ORIGIN)
|
|
|
|
nexthop = update_msg.get_path_attr(BGP_ATTR_TYPE_NEXT_HOP)
|
|
if not nexthop:
|
|
raise bgp.MissingWellKnown(BGP_ATTR_TYPE_NEXT_HOP)
|
|
|
|
return True
|
|
|
|
# Check if received MP_UNREACH path attribute is of available afi/safi
|
|
if mp_unreach_attr:
|
|
if not self.is_mpbgp_cap_valid(mp_unreach_attr.route_family):
|
|
LOG.error('Got UPDATE message with un-available afi/safi for'
|
|
' MP_UNREACH path attribute (non-negotiated'
|
|
' afi/safi) %s', mp_unreach_attr.route_family)
|
|
# raise bgp.OptAttrError()
|
|
|
|
if mp_reach_attr:
|
|
# Check if received MP_REACH path attribute is of available
|
|
# afi/safi
|
|
if not self.is_mpbgp_cap_valid(mp_reach_attr.route_family):
|
|
LOG.error('Got UPDATE message with un-available afi/safi for'
|
|
' MP_UNREACH path attribute (non-negotiated'
|
|
' afi/safi) %s', mp_reach_attr.route_family)
|
|
# raise bgp.OptAttrError()
|
|
|
|
# Check for missing well-known mandatory attributes.
|
|
aspath = update_msg.get_path_attr(BGP_ATTR_TYPE_AS_PATH)
|
|
if not aspath:
|
|
raise bgp.MissingWellKnown(BGP_ATTR_TYPE_AS_PATH)
|
|
|
|
if (self.check_first_as and self.is_ebgp_peer() and
|
|
not aspath.has_matching_leftmost(self.remote_as)):
|
|
LOG.error('First AS check fails. Raise appropriate exception.')
|
|
raise bgp.MalformedAsPath()
|
|
|
|
origin = update_msg.get_path_attr(BGP_ATTR_TYPE_ORIGIN)
|
|
if not origin:
|
|
raise bgp.MissingWellKnown(BGP_ATTR_TYPE_ORIGIN)
|
|
|
|
# Validate Next hop.
|
|
if mp_reach_attr.route_family.safi in (
|
|
subaddr_family.IP_FLOWSPEC,
|
|
subaddr_family.VPN_FLOWSPEC):
|
|
# Because the Flow Specification does not have nexthop,
|
|
# skips check.
|
|
pass
|
|
elif (not mp_reach_attr.next_hop or
|
|
mp_reach_attr.next_hop == self.host_bind_ip):
|
|
LOG.error('Nexthop of received UPDATE msg. (%s) same as local'
|
|
' interface address %s.',
|
|
mp_reach_attr.next_hop,
|
|
self.host_bind_ip)
|
|
return False
|
|
|
|
return True
|
|
|
|
def _handle_update_msg(self, update_msg):
|
|
"""Extracts and processes new paths or withdrawals in given
|
|
`update_msg`.
|
|
|
|
Parameter:
|
|
- `update_msg`: update message to process.
|
|
- `valid_rts`: current valid/interesting rts to the application
|
|
according to configuration of all VRFs.
|
|
Assumes Multiprotocol Extensions capability is supported and enabled.
|
|
"""
|
|
assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
|
|
|
|
# Increment count of update received.
|
|
self.state.incr(PeerCounterNames.RECV_UPDATES)
|
|
|
|
if not self._validate_update_msg(update_msg):
|
|
# If update message was not valid for some reason, we ignore its
|
|
# routes.
|
|
LOG.error('UPDATE message was invalid, hence ignoring its routes.')
|
|
return
|
|
|
|
# Extract advertised path attributes and reconstruct AS_PATH attribute
|
|
self._extract_and_reconstruct_as_path(update_msg)
|
|
|
|
# Check if path attributes have loops.
|
|
if self._is_looped_path_attrs(update_msg):
|
|
return
|
|
|
|
umsg_pattrs = update_msg.pathattr_map
|
|
mp_reach_attr = umsg_pattrs.get(BGP_ATTR_TYPE_MP_REACH_NLRI, None)
|
|
if mp_reach_attr:
|
|
# Extract advertised MP-BGP paths from given message.
|
|
self._extract_and_handle_mpbgp_new_paths(update_msg)
|
|
|
|
mp_unreach_attr = umsg_pattrs.get(BGP_ATTR_TYPE_MP_UNREACH_NLRI, None)
|
|
if mp_unreach_attr:
|
|
# Extract MP-BGP withdraws from given message.
|
|
self._extract_and_handle_mpbgp_withdraws(mp_unreach_attr)
|
|
|
|
nlri_list = update_msg.nlri
|
|
if nlri_list:
|
|
# Extract advertised BGP paths from given message.
|
|
self._extract_and_handle_bgp4_new_paths(update_msg)
|
|
|
|
withdraw_list = update_msg.withdrawn_routes
|
|
if withdraw_list:
|
|
# Extract BGP withdraws from given message.
|
|
self._extract_and_handle_bgp4_withdraws(withdraw_list)
|
|
|
|
def _extract_and_reconstruct_as_path(self, update_msg):
|
|
"""Extracts advertised AS path attributes in the given update message
|
|
and reconstructs AS_PATH from AS_PATH and AS4_PATH if needed."""
|
|
umsg_pattrs = update_msg.pathattr_map
|
|
|
|
as_aggregator = umsg_pattrs.get(BGP_ATTR_TYPE_AGGREGATOR, None)
|
|
as4_aggregator = umsg_pattrs.get(BGP_ATTR_TYPE_AS4_AGGREGATOR, None)
|
|
if as_aggregator and as4_aggregator:
|
|
# When both AGGREGATOR and AS4_AGGREGATOR are received,
|
|
# if the AS number in the AGGREGATOR attribute is not AS_TRANS,
|
|
# then:
|
|
# - the AS4_AGGREGATOR attribute and the AS4_PATH attribute SHALL
|
|
# be ignored,
|
|
# - the AGGREGATOR attribute SHALL be taken as the information
|
|
# about the aggregating node, and
|
|
# - the AS_PATH attribute SHALL be taken as the AS path
|
|
# information.
|
|
if as_aggregator.as_number != bgp.AS_TRANS:
|
|
update_msg.path_attributes.remove(as4_aggregator)
|
|
as4_path = umsg_pattrs.pop(BGP_ATTR_TYPE_AS4_PATH, None)
|
|
if as4_path:
|
|
update_msg.path_attributes.remove(as4_path)
|
|
# Otherwise,
|
|
# - the AGGREGATOR attribute SHALL be ignored,
|
|
# - the AS4_AGGREGATOR attribute SHALL be taken as the
|
|
# information about the aggregating node, and
|
|
# - the AS path information would need to be constructed,
|
|
# as in all other cases.
|
|
else:
|
|
update_msg.path_attributes.remove(as_aggregator)
|
|
update_msg.path_attributes.remove(as4_aggregator)
|
|
update_msg.path_attributes.append(
|
|
bgp.BGPPathAttributeAggregator(
|
|
as_number=as4_aggregator.as_number,
|
|
addr=as4_aggregator.addr,
|
|
)
|
|
)
|
|
|
|
as_path = umsg_pattrs.get(BGP_ATTR_TYPE_AS_PATH, None)
|
|
as4_path = umsg_pattrs.get(BGP_ATTR_TYPE_AS4_PATH, None)
|
|
if as_path and as4_path:
|
|
# If the number of AS numbers in the AS_PATH attribute is
|
|
# less than the number of AS numbers in the AS4_PATH attribute,
|
|
# then the AS4_PATH attribute SHALL be ignored, and the AS_PATH
|
|
# attribute SHALL be taken as the AS path information.
|
|
if as_path.get_as_path_len() < as4_path.get_as_path_len():
|
|
update_msg.path_attributes.remove(as4_path)
|
|
|
|
# If the number of AS numbers in the AS_PATH attribute is larger
|
|
# than or equal to the number of AS numbers in the AS4_PATH
|
|
# attribute, then the AS path information SHALL be constructed
|
|
# by taking as many AS numbers and path segments as necessary
|
|
# from the leading part of the AS_PATH attribute, and then
|
|
# prepending them to the AS4_PATH attribute so that the AS path
|
|
# information has a number of AS numbers identical to that of
|
|
# the AS_PATH attribute.
|
|
else:
|
|
update_msg.path_attributes.remove(as_path)
|
|
update_msg.path_attributes.remove(as4_path)
|
|
as_path = self._construct_as_path_attr(as_path, as4_path)
|
|
update_msg.path_attributes.append(as_path)
|
|
|
|
def _is_looped_path_attrs(self, update_msg):
|
|
"""
|
|
Extracts path attributes from the given UPDATE message and checks
|
|
if the given attributes have loops or not.
|
|
|
|
:param update_msg: UPDATE message instance.
|
|
:return: True if attributes have loops. Otherwise False.
|
|
"""
|
|
umsg_pattrs = update_msg.pathattr_map
|
|
recv_open_msg = self.protocol.recv_open_msg
|
|
|
|
# Check if AS_PATH has loops.
|
|
aspath = umsg_pattrs.get(BGP_ATTR_TYPE_AS_PATH)
|
|
if (aspath is not None
|
|
and aspath.has_local_as(
|
|
self.local_as,
|
|
max_count=self._common_conf.allow_local_as_in_count)):
|
|
LOG.error(
|
|
'AS_PATH on UPDATE message has loops. '
|
|
'Ignoring this message: %s',
|
|
update_msg)
|
|
return
|
|
|
|
# Check if ORIGINATOR_ID has loops. [RFC4456]
|
|
originator_id = umsg_pattrs.get(BGP_ATTR_TYPE_ORIGINATOR_ID, None)
|
|
if (originator_id
|
|
and recv_open_msg.bgp_identifier == originator_id):
|
|
LOG.error(
|
|
'ORIGINATOR_ID on UPDATE message has loops. '
|
|
'Ignoring this message: %s',
|
|
update_msg)
|
|
return
|
|
|
|
# Check if CLUSTER_LIST has loops. [RFC4456]
|
|
cluster_list = umsg_pattrs.get(BGP_ATTR_TYPE_CLUSTER_LIST, None)
|
|
if (cluster_list
|
|
and self._common_conf.cluster_id in cluster_list.value):
|
|
LOG.error(
|
|
'CLUSTER_LIST on UPDATE message has loops. '
|
|
'Ignoring this message: %s', update_msg)
|
|
return
|
|
|
|
def _extract_and_handle_bgp4_new_paths(self, update_msg):
|
|
"""Extracts new paths advertised in the given update message's
|
|
*MpReachNlri* attribute.
|
|
|
|
Assumes MPBGP capability is enabled and message was validated.
|
|
Parameters:
|
|
- update_msg: (Update) is assumed to be checked for all bgp
|
|
message errors.
|
|
- valid_rts: (iterable) current valid/configured RTs.
|
|
|
|
Extracted paths are added to appropriate *Destination* for further
|
|
processing.
|
|
"""
|
|
umsg_pattrs = update_msg.pathattr_map
|
|
next_hop = update_msg.get_path_attr(BGP_ATTR_TYPE_NEXT_HOP).value
|
|
|
|
# Nothing to do if we do not have any new NLRIs in this message.
|
|
msg_nlri_list = update_msg.nlri
|
|
if not msg_nlri_list:
|
|
LOG.debug('Update message did not have any new MP_REACH_NLRIs.')
|
|
return
|
|
|
|
# Create path instances for each NLRI from the update message.
|
|
for msg_nlri in msg_nlri_list:
|
|
LOG.debug('NLRI: %s', msg_nlri)
|
|
new_path = bgp_utils.create_path(
|
|
self,
|
|
msg_nlri,
|
|
pattrs=umsg_pattrs,
|
|
nexthop=next_hop
|
|
)
|
|
LOG.debug('Extracted paths from Update msg.: %s', new_path)
|
|
|
|
block, blocked_cause = self._apply_in_filter(new_path)
|
|
|
|
nlri_str = new_path.nlri.formatted_nlri_str
|
|
received_route = ReceivedRoute(new_path, self, block)
|
|
self._adj_rib_in[nlri_str] = received_route
|
|
self._signal_bus.adj_rib_in_changed(self, received_route)
|
|
|
|
if not block:
|
|
# Update appropriate table with new paths.
|
|
tm = self._core_service.table_manager
|
|
tm.learn_path(new_path)
|
|
else:
|
|
LOG.debug('prefix : %s is blocked by in-bound filter: %s',
|
|
msg_nlri, blocked_cause)
|
|
|
|
# If update message had any qualifying new paths, do some book-keeping.
|
|
if msg_nlri_list:
|
|
# Update prefix statistics.
|
|
self.state.incr(PeerCounterNames.RECV_PREFIXES,
|
|
incr_by=len(msg_nlri_list))
|
|
# Check if we exceed max. prefixes allowed for this neighbor.
|
|
if self._neigh_conf.exceeds_max_prefix_allowed(
|
|
self.state.get_count(PeerCounterNames.RECV_PREFIXES)):
|
|
LOG.error('Max. prefix allowed for this neighbor '
|
|
'exceeded.')
|
|
|
|
def _extract_and_handle_bgp4_withdraws(self, withdraw_list):
|
|
"""Extracts withdraws advertised in the given update message's
|
|
*MpUnReachNlri* attribute.
|
|
|
|
Assumes MPBGP capability is enabled.
|
|
Parameters:
|
|
- update_msg: (Update) is assumed to be checked for all bgp
|
|
message errors.
|
|
|
|
Extracted withdraws are added to appropriate *Destination* for further
|
|
processing.
|
|
"""
|
|
msg_rf = RF_IPv4_UC
|
|
w_nlris = withdraw_list
|
|
if not w_nlris:
|
|
# If this is EOR of some kind, handle it
|
|
self._handle_eor(msg_rf)
|
|
|
|
for w_nlri in w_nlris:
|
|
w_path = bgp_utils.create_path(
|
|
self,
|
|
w_nlri,
|
|
is_withdraw=True
|
|
)
|
|
|
|
block, blocked_cause = self._apply_in_filter(w_path)
|
|
|
|
received_route = ReceivedRoute(w_path, self, block)
|
|
nlri_str = w_nlri.formatted_nlri_str
|
|
|
|
if nlri_str in self._adj_rib_in:
|
|
del self._adj_rib_in[nlri_str]
|
|
self._signal_bus.adj_rib_in_changed(self, received_route)
|
|
|
|
if not block:
|
|
# Update appropriate table with withdraws.
|
|
tm = self._core_service.table_manager
|
|
tm.learn_path(w_path)
|
|
else:
|
|
LOG.debug('prefix : %s is blocked by in-bound filter: %s',
|
|
nlri_str, blocked_cause)
|
|
|
|
def _extract_and_handle_mpbgp_new_paths(self, update_msg):
|
|
"""Extracts new paths advertised in the given update message's
|
|
*MpReachNlri* attribute.
|
|
|
|
Assumes MPBGP capability is enabled and message was validated.
|
|
Parameters:
|
|
- update_msg: (Update) is assumed to be checked for all bgp
|
|
message errors.
|
|
- valid_rts: (iterable) current valid/configured RTs.
|
|
|
|
Extracted paths are added to appropriate *Destination* for further
|
|
processing.
|
|
"""
|
|
umsg_pattrs = update_msg.pathattr_map
|
|
mpreach_nlri_attr = umsg_pattrs.get(BGP_ATTR_TYPE_MP_REACH_NLRI)
|
|
assert mpreach_nlri_attr
|
|
|
|
msg_rf = mpreach_nlri_attr.route_family
|
|
# Check if this route family is among supported route families.
|
|
if msg_rf not in SUPPORTED_GLOBAL_RF:
|
|
LOG.info(('Received route for route family %s which is'
|
|
' not supported. Ignoring paths from this UPDATE: %s') %
|
|
(msg_rf, update_msg))
|
|
return
|
|
|
|
if msg_rf in (RF_IPv4_VPN, RF_IPv6_VPN):
|
|
# Check if we have Extended Communities Attribute.
|
|
# TODO(PH): Check if RT_NLRI afi/safi will ever have this attribute
|
|
ext_comm_attr = umsg_pattrs.get(BGP_ATTR_TYPE_EXTENDED_COMMUNITIES)
|
|
# Check if we have at-least one RT is of interest to us.
|
|
if not ext_comm_attr:
|
|
LOG.info('Missing Extended Communities Attribute. '
|
|
'Ignoring paths from this UPDATE: %s', update_msg)
|
|
return
|
|
|
|
msg_rts = ext_comm_attr.rt_list
|
|
# If we do not have any RTs associated with this msg., we do not
|
|
# extract any paths.
|
|
if not msg_rts:
|
|
LOG.info('Received route with no RTs. Ignoring paths in this'
|
|
' UPDATE: %s', update_msg)
|
|
return
|
|
|
|
# If none of the RTs in the message are of interest, we do not
|
|
# extract any paths.
|
|
interested_rts = self._core_service.global_interested_rts
|
|
if not interested_rts.intersection(msg_rts):
|
|
LOG.info('Received route with RT %s that is of no interest to'
|
|
' any VRFs or Peers %s.'
|
|
' Ignoring paths from this UPDATE: %s',
|
|
msg_rts, interested_rts, update_msg)
|
|
return
|
|
|
|
next_hop = mpreach_nlri_attr.next_hop
|
|
|
|
# Nothing to do if we do not have any new NLRIs in this message.
|
|
msg_nlri_list = mpreach_nlri_attr.nlri
|
|
if not msg_nlri_list:
|
|
LOG.debug('Update message did not have any new MP_REACH_NLRIs.')
|
|
return
|
|
|
|
# Create path instances for each NLRI from the update message.
|
|
for msg_nlri in msg_nlri_list:
|
|
new_path = bgp_utils.create_path(
|
|
self,
|
|
msg_nlri,
|
|
pattrs=umsg_pattrs,
|
|
nexthop=next_hop
|
|
)
|
|
LOG.debug('Extracted paths from Update msg.: %s', new_path)
|
|
|
|
block, blocked_cause = self._apply_in_filter(new_path)
|
|
|
|
received_route = ReceivedRoute(new_path, self, block)
|
|
nlri_str = msg_nlri.formatted_nlri_str
|
|
self._adj_rib_in[nlri_str] = received_route
|
|
self._signal_bus.adj_rib_in_changed(self, received_route)
|
|
|
|
if not block:
|
|
if msg_rf == RF_RTC_UC \
|
|
and self._init_rtc_nlri_path is not None:
|
|
self._init_rtc_nlri_path.append(new_path)
|
|
else:
|
|
# Update appropriate table with new paths.
|
|
tm = self._core_service.table_manager
|
|
tm.learn_path(new_path)
|
|
else:
|
|
LOG.debug('prefix : %s is blocked by in-bound filter: %s',
|
|
msg_nlri, blocked_cause)
|
|
|
|
# If update message had any qualifying new paths, do some book-keeping.
|
|
if msg_nlri_list:
|
|
# Update prefix statistics.
|
|
self.state.incr(PeerCounterNames.RECV_PREFIXES,
|
|
incr_by=len(msg_nlri_list))
|
|
# Check if we exceed max. prefixes allowed for this neighbor.
|
|
if self._neigh_conf.exceeds_max_prefix_allowed(
|
|
self.state.get_count(PeerCounterNames.RECV_PREFIXES)):
|
|
LOG.error('Max. prefix allowed for this neighbor '
|
|
'exceeded.')
|
|
|
|
def _extract_and_handle_mpbgp_withdraws(self, mp_unreach_attr):
|
|
"""Extracts withdraws advertised in the given update message's
|
|
*MpUnReachNlri* attribute.
|
|
|
|
Assumes MPBGP capability is enabled.
|
|
Parameters:
|
|
- update_msg: (Update) is assumed to be checked for all bgp
|
|
message errors.
|
|
|
|
Extracted withdraws are added to appropriate *Destination* for further
|
|
processing.
|
|
"""
|
|
msg_rf = mp_unreach_attr.route_family
|
|
# Check if this route family is among supported route families.
|
|
if msg_rf not in SUPPORTED_GLOBAL_RF:
|
|
LOG.info(
|
|
'Received route family %s is not supported. '
|
|
'Ignoring withdraw routes on this UPDATE message.',
|
|
msg_rf)
|
|
return
|
|
|
|
w_nlris = mp_unreach_attr.withdrawn_routes
|
|
if not w_nlris:
|
|
# If this is EOR of some kind, handle it
|
|
self._handle_eor(msg_rf)
|
|
|
|
for w_nlri in w_nlris:
|
|
w_path = bgp_utils.create_path(
|
|
self,
|
|
w_nlri,
|
|
is_withdraw=True
|
|
)
|
|
block, blocked_cause = self._apply_in_filter(w_path)
|
|
|
|
received_route = ReceivedRoute(w_path, self, block)
|
|
nlri_str = w_nlri.formatted_nlri_str
|
|
|
|
if nlri_str in self._adj_rib_in:
|
|
del self._adj_rib_in[nlri_str]
|
|
self._signal_bus.adj_rib_in_changed(self, received_route)
|
|
|
|
if not block:
|
|
# Update appropriate table with withdraws.
|
|
tm = self._core_service.table_manager
|
|
tm.learn_path(w_path)
|
|
else:
|
|
LOG.debug('prefix : %s is blocked by in-bound filter: %s',
|
|
w_nlri, blocked_cause)
|
|
|
|
def _handle_eor(self, route_family):
|
|
"""Currently we only handle EOR for RTC address-family.
|
|
|
|
We send non-rtc initial updates if not already sent.
|
|
"""
|
|
LOG.debug('Handling EOR for %s', route_family)
|
|
# assert (route_family in SUPPORTED_GLOBAL_RF)
|
|
# assert self.is_mbgp_cap_valid(route_family)
|
|
|
|
if route_family == RF_RTC_UC:
|
|
self._unschedule_sending_init_updates()
|
|
|
|
# Learn all rt_nlri at the same time As RT are learned and RT
|
|
# filter get updated, qualifying NLRIs are automatically sent to
|
|
# peer including initial update
|
|
tm = self._core_service.table_manager
|
|
for rt_nlri in self._init_rtc_nlri_path:
|
|
tm.learn_path(rt_nlri)
|
|
# Give chance to process new RT_NLRI so that we have updated RT
|
|
# filter for all peer including this peer before we communicate
|
|
# NLRIs for other address-families
|
|
self.pause(0)
|
|
# Clear collection of initial RTs as we no longer need to wait for
|
|
# EOR for RT NLRIs and to indicate that new RT NLRIs should be
|
|
# handled in a regular fashion
|
|
self._init_rtc_nlri_path = None
|
|
|
|
def handle_msg(self, msg):
|
|
"""BGP message handler.
|
|
|
|
BGP message handling is shared between protocol instance and peer. Peer
|
|
only handles limited messages under suitable state. Here we handle
|
|
KEEPALIVE, UPDATE and ROUTE_REFRESH messages. UPDATE and ROUTE_REFRESH
|
|
messages are handled only after session is established.
|
|
"""
|
|
if msg.type == BGP_MSG_KEEPALIVE:
|
|
# If we receive a Keep Alive message in open_confirm state, we
|
|
# transition to established state.
|
|
if self.state.bgp_state == const.BGP_FSM_OPEN_CONFIRM:
|
|
self.state.bgp_state = const.BGP_FSM_ESTABLISHED
|
|
self._enqueue_init_updates()
|
|
|
|
elif msg.type == BGP_MSG_UPDATE:
|
|
assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
|
|
# Will try to process this UDPATE message further
|
|
self._handle_update_msg(msg)
|
|
|
|
elif msg.type == BGP_MSG_ROUTE_REFRESH:
|
|
# If its route-refresh message
|
|
assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
|
|
self._handle_route_refresh_msg(msg)
|
|
|
|
else:
|
|
# Open/Notification messages are currently handled by protocol and
|
|
# nothing is done inside peer, so should not see them here.
|
|
raise ValueError('Peer does not support handling of %s'
|
|
' message during %s state' %
|
|
(msg, self.state.bgp_state))
|
|
|
|
def _handle_err_sor_msg(self, afi, safi):
|
|
# Check if ERR capability is enabled for this peer.
|
|
if not self._protocol.is_enhanced_rr_cap_valid():
|
|
LOG.error('Received Start-of-RIB (SOR) even though ERR is not'
|
|
' enabled')
|
|
return
|
|
|
|
# Increment the version number of this peer so that we can identify
|
|
# inconsistencies/stale routes.
|
|
self.version_num += 1
|
|
|
|
# Check if refresh_stalepath_time is enabled.
|
|
rst = self._common_conf.refresh_stalepath_time
|
|
if rst != 0:
|
|
# Set a timer to clean the stale paths at configured time.
|
|
# Clean/track inconsistent/stale routes.
|
|
route_family = RouteFamily(afi, safi)
|
|
if route_family in SUPPORTED_GLOBAL_RF:
|
|
self._refresh_stalepath_timer = self._spawn_after(
|
|
'err-refresh-stale-path-timer', rst,
|
|
self._core_service.table_manager.clean_stale_routes, self,
|
|
route_family)
|
|
LOG.debug('Refresh Stale Path timer set (%s sec).', rst)
|
|
|
|
def _handle_route_refresh_msg(self, msg):
|
|
afi = msg.afi
|
|
safi = msg.safi
|
|
demarcation = msg.demarcation
|
|
|
|
# If this normal route-refresh request.
|
|
if demarcation == 0:
|
|
self._handle_route_refresh_req(afi, safi)
|
|
|
|
# If this is start of RIB (SOR) message.
|
|
elif demarcation == 1:
|
|
self._handle_err_sor_msg(afi, safi)
|
|
|
|
# If this is end of RIB (EOR) message.
|
|
elif demarcation == 2:
|
|
# Clean/track inconsistent/stale routes.
|
|
route_family = RouteFamily(afi, safi)
|
|
if route_family in SUPPORTED_GLOBAL_RF:
|
|
tm = self._core_service.table_manager
|
|
tm.clean_stale_routes(self, route_family)
|
|
|
|
else:
|
|
LOG.error('Route refresh message has invalid demarcation %s',
|
|
demarcation)
|
|
|
|
def _handle_route_refresh_req(self, afi, safi):
|
|
rr_af = get_rf(afi, safi)
|
|
self.state.incr(PeerCounterNames.RECV_REFRESH)
|
|
|
|
# Check if peer has asked for route-refresh for af that was advertised
|
|
if not self._protocol.is_route_family_adv(rr_af):
|
|
LOG.info('Peer asked for route - refresh for un - advertised '
|
|
'address - family %s', rr_af)
|
|
return
|
|
|
|
self._fire_route_refresh(rr_af)
|
|
|
|
def _fire_route_refresh(self, af):
|
|
# Check if enhanced route refresh is enabled/valid.
|
|
sor = None
|
|
if self._protocol.is_enhanced_rr_cap_valid():
|
|
# If enhanced route-refresh is valid/enabled, enqueue SOR.
|
|
afi = af.afi
|
|
safi = af.safi
|
|
sor = BGPRouteRefresh(afi, safi, demarcation=1)
|
|
self.enque_first_outgoing_msg(sor)
|
|
|
|
# Ask core to re-send sent routes
|
|
self._peer_manager.resend_sent(af, self)
|
|
|
|
# If enhanced route-refresh is valid/enabled, then we enqueue EOR.
|
|
if sor is not None:
|
|
self._enqueue_eor_msg(sor)
|
|
|
|
def _enqueue_eor_msg(self, sor):
|
|
"""Enqueues Enhanced RR EOR if for given SOR a EOR is not already
|
|
sent.
|
|
"""
|
|
if self._protocol.is_enhanced_rr_cap_valid() and not sor.eor_sent:
|
|
afi = sor.afi
|
|
safi = sor.safi
|
|
eor = BGPRouteRefresh(afi, safi, demarcation=2)
|
|
self.enque_outgoing_msg(eor)
|
|
sor.eor_sent = True
|
|
|
|
def _schedule_sending_init_updates(self):
|
|
"""Setup timer for sending best-paths for all other address-families
|
|
that qualify.
|
|
|
|
Setup timer for sending initial updates to peer.
|
|
"""
|
|
|
|
def _enqueue_non_rtc_init_updates():
|
|
LOG.debug('Scheduled queuing of initial Non-RTC UPDATEs')
|
|
tm = self._core_service.table_manager
|
|
self.comm_all_best_paths(tm.global_tables)
|
|
self._sent_init_non_rtc_update = True
|
|
# Stop the timer as we have handled RTC EOR
|
|
self._rtc_eor_timer.stop()
|
|
self._rtc_eor_timer = None
|
|
|
|
self._sent_init_non_rtc_update = False
|
|
self._rtc_eor_timer = self._create_timer(
|
|
Peer.RTC_EOR_TIMER_NAME,
|
|
_enqueue_non_rtc_init_updates
|
|
)
|
|
# Start timer for sending initial updates
|
|
self._rtc_eor_timer.start(const.RTC_EOR_DEFAULT_TIME, now=False)
|
|
LOG.debug('Scheduled sending of initial Non-RTC UPDATEs after:'
|
|
' %s sec', const.RTC_EOR_DEFAULT_TIME)
|
|
|
|
def _unschedule_sending_init_updates(self):
|
|
"""Un-schedules sending of initial updates
|
|
|
|
Stops the timer if set for sending initial updates.
|
|
Returns:
|
|
- True if timer was stopped
|
|
- False if timer was already stopped and nothing was done
|
|
"""
|
|
LOG.debug('Un-scheduling sending of initial Non-RTC UPDATEs'
|
|
' (init. UPDATEs already sent: %s)',
|
|
self._sent_init_non_rtc_update)
|
|
if self._rtc_eor_timer:
|
|
self._rtc_eor_timer.stop()
|
|
self._rtc_eor_timer = None
|
|
return True
|
|
return False
|
|
|
|
def _enqueue_init_updates(self):
|
|
"""Enqueues current routes to be shared with this peer."""
|
|
assert self.state.bgp_state == const.BGP_FSM_ESTABLISHED
|
|
if self.is_mbgp_cap_valid(RF_RTC_UC):
|
|
# Enqueues all best-RTC_NLRIs to be sent as initial update to this
|
|
# peer.
|
|
self._peer_manager.comm_all_rt_nlris(self)
|
|
self._schedule_sending_init_updates()
|
|
else:
|
|
# Enqueues all best-path to be sent as initial update to this peer
|
|
# expect for RTC route-family.
|
|
tm = self._core_service.table_manager
|
|
self.comm_all_best_paths(tm.global_tables)
|
|
|
|
def comm_all_best_paths(self, global_tables):
|
|
"""Shares/communicates current best paths with this peers.
|
|
|
|
Can be used to send initial updates after we have established session
|
|
with `peer`.
|
|
"""
|
|
LOG.debug('Communicating current best path for all afi/safi except'
|
|
' 1/132')
|
|
# We will enqueue best path from all global destination.
|
|
for route_family, table in global_tables.items():
|
|
if route_family == RF_RTC_UC:
|
|
continue
|
|
if self.is_mbgp_cap_valid(route_family):
|
|
for dest in table.values():
|
|
if dest.best_path:
|
|
self.communicate_path(dest.best_path)
|
|
|
|
def communicate_path(self, path):
|
|
"""Communicates `path` to this peer if it qualifies.
|
|
|
|
Checks if `path` should be shared/communicated with this peer according
|
|
to various conditions: like bgp state, transmit side loop, local and
|
|
remote AS path, community attribute, etc.
|
|
"""
|
|
LOG.debug('Peer %s asked to communicate path', self)
|
|
if not path:
|
|
raise ValueError('Invalid path %s given.' % path)
|
|
|
|
# We do not send anything to peer who is not in established state.
|
|
if not self.in_established():
|
|
LOG.debug('Skipping sending path as peer is not in '
|
|
'ESTABLISHED state %s', path)
|
|
return
|
|
|
|
# Check if this session is available for given paths afi/safi
|
|
path_rf = path.route_family
|
|
if not (self.is_mpbgp_cap_valid(path_rf) or
|
|
path_rf in [RF_IPv4_UC, RF_IPv6_UC]):
|
|
LOG.debug('Skipping sending path as %s route family is not'
|
|
' available for this session', path_rf)
|
|
return
|
|
|
|
# If RTC capability is available and path afi/saif is other than RT
|
|
# nlri
|
|
if path_rf != RF_RTC_UC and \
|
|
self.is_mpbgp_cap_valid(RF_RTC_UC):
|
|
rtfilter = self._peer_manager.curr_peer_rtfilter(self)
|
|
# If peer does not have any rtfilter or if rtfilter does not have
|
|
# any RTs common with path RTs we do not share this path with the
|
|
# peer
|
|
if rtfilter and not path.has_rts_in(rtfilter):
|
|
LOG.debug('Skipping sending path as rffilter %s and path '
|
|
'rts %s have no RT in common',
|
|
rtfilter, path.get_rts())
|
|
return
|
|
|
|
# Transmit side loop detection: We check if leftmost AS matches
|
|
# peers AS, if so we do not send UPDATE message to this peer.
|
|
as_path = path.get_pattr(BGP_ATTR_TYPE_AS_PATH)
|
|
if as_path and as_path.has_matching_leftmost(self.remote_as):
|
|
LOG.debug('Skipping sending path as AS_PATH has peer AS %s',
|
|
self.remote_as)
|
|
return
|
|
|
|
# If this peer is a route server client, we forward the path
|
|
# regardless of AS PATH loop, whether the connection is iBGP or eBGP,
|
|
# or path's communities.
|
|
if self.is_route_server_client:
|
|
outgoing_route = OutgoingRoute(path)
|
|
self.enque_outgoing_msg(outgoing_route)
|
|
|
|
if self._neigh_conf.multi_exit_disc:
|
|
med_attr = path.get_pattr(BGP_ATTR_TYPE_MULTI_EXIT_DISC)
|
|
if not med_attr:
|
|
path = bgp_utils.clone_path_and_update_med_for_target_neighbor(
|
|
path,
|
|
self._neigh_conf.multi_exit_disc
|
|
)
|
|
|
|
# For connected/local-prefixes, we send update to all peers.
|
|
if path.source is None:
|
|
# Construct OutgoingRoute specific for this peer and put it in
|
|
# its sink.
|
|
outgoing_route = OutgoingRoute(path)
|
|
self.enque_outgoing_msg(outgoing_route)
|
|
|
|
# If path from a bgp-peer is new best path, we share it with
|
|
# all bgp-peers except the source peer and other peers in his AS.
|
|
# This is default Junos setting that in Junos can be disabled with
|
|
# 'advertise-peer-as' setting.
|
|
elif (self != path.source or
|
|
self.remote_as != path.source.remote_as):
|
|
# When BGP speaker receives an UPDATE message from an internal
|
|
# peer, the receiving BGP speaker SHALL NOT re-distribute the
|
|
# routing information contained in that UPDATE message to other
|
|
# internal peers (unless the speaker acts as a BGP Route
|
|
# Reflector) [RFC4271].
|
|
if (self.remote_as == self._core_service.asn
|
|
and self.remote_as == path.source.remote_as
|
|
and isinstance(path.source, Peer)
|
|
and not path.source.is_route_reflector_client
|
|
and not self.is_route_reflector_client):
|
|
LOG.debug(
|
|
'Skipping sending iBGP route to iBGP peer %s AS %s',
|
|
self.ip_address, self.remote_as)
|
|
return
|
|
|
|
# If new best path has community attribute, it should be taken into
|
|
# account when sending UPDATE to peers.
|
|
comm_attr = path.get_pattr(BGP_ATTR_TYPE_COMMUNITIES)
|
|
if comm_attr:
|
|
comm_attr_na = comm_attr.has_comm_attr(
|
|
BGPPathAttributeCommunities.NO_ADVERTISE
|
|
)
|
|
# If we have NO_ADVERTISE attribute present, we do not send
|
|
# UPDATE to any peers
|
|
if comm_attr_na:
|
|
LOG.debug('Path has community attr. NO_ADVERTISE = %s'
|
|
'. Hence not advertising to peer',
|
|
comm_attr_na)
|
|
return
|
|
|
|
comm_attr_ne = comm_attr.has_comm_attr(
|
|
BGPPathAttributeCommunities.NO_EXPORT
|
|
)
|
|
comm_attr_nes = comm_attr.has_comm_attr(
|
|
BGPPathAttributeCommunities.NO_EXPORT_SUBCONFED
|
|
)
|
|
# If NO_EXPORT_SUBCONFED/NO_EXPORT is one of the attribute, we
|
|
# do not advertise to eBGP peers as we do not have any
|
|
# confederation feature at this time.
|
|
if ((comm_attr_nes or comm_attr_ne) and
|
|
(self.remote_as != self._core_service.asn)):
|
|
LOG.debug('Skipping sending UPDATE to peer: %s as per '
|
|
'community attribute configuration', self)
|
|
return
|
|
|
|
# Construct OutgoingRoute specific for this peer and put it in
|
|
# its sink.
|
|
outgoing_route = OutgoingRoute(path)
|
|
self.enque_outgoing_msg(outgoing_route)
|
|
LOG.debug('Enqueued outgoing route %s for peer %s',
|
|
outgoing_route.path.nlri, self)
|
|
|
|
def connection_made(self):
|
|
"""Protocols connection established handler
|
|
"""
|
|
LOG.info(
|
|
'Connection to peer: %s established',
|
|
self._neigh_conf.ip_address,
|
|
extra={
|
|
'resource_name': self._neigh_conf.name,
|
|
'resource_id': self._neigh_conf.id
|
|
}
|
|
)
|
|
|
|
def connection_lost(self, reason):
|
|
"""Protocols connection lost handler.
|
|
"""
|
|
LOG.info(
|
|
'Connection to peer %s lost, reason: %s Resetting '
|
|
'retry connect loop: %s' %
|
|
(self._neigh_conf.ip_address, reason,
|
|
self._connect_retry_event.is_set()),
|
|
extra={
|
|
'resource_name': self._neigh_conf.name,
|
|
'resource_id': self._neigh_conf.id
|
|
}
|
|
)
|
|
self.state.bgp_state = const.BGP_FSM_IDLE
|
|
if self._protocol:
|
|
self._protocol.stop()
|
|
self._protocol = None
|
|
# Create new collection for initial RT NLRIs
|
|
self._init_rtc_nlri_path = []
|
|
self._sent_init_non_rtc_update = False
|
|
# Clear sink.
|
|
self.clear_outgoing_msg_list()
|
|
# Un-schedule timers
|
|
self._unschedule_sending_init_updates()
|
|
|
|
# Increment the version number of this source.
|
|
self.version_num += 1
|
|
self._peer_manager.on_peer_down(self)
|
|
|
|
# Check configuration if neighbor is still enabled, we try
|
|
# reconnecting.
|
|
if self._neigh_conf.enabled:
|
|
if not self._connect_retry_event.is_set():
|
|
self._connect_retry_event.set()
|
|
|
|
@staticmethod
|
|
def _lookup_attribute_map(attribute_map, attr_type, path):
|
|
result_attr = None
|
|
if attr_type in attribute_map:
|
|
maps = attribute_map[attr_type]
|
|
for m in maps:
|
|
cause, result = m.evaluate(path)
|
|
LOG.debug(
|
|
"local_pref evaluation result:%s, cause:%s",
|
|
result, cause)
|
|
if result:
|
|
result_attr = m.get_attribute()
|
|
break
|
|
return result_attr
|