Add BGP Callback and agent RPC notifcation implementations

This patch adds RPC calls to the BGP speaker service plugin. This enables
the service plugin to notify the appropriate BGP dynamic routing agents
(bgp_dragent) when tenants add router gateway ports, router interfaces,
associate/disassociate floating IP's, and when and admin binds/unbinds
a gateway network to a BGP speaker. Compatible with centralized routers,
distributed routers not supported.

Partially-Implements: blueprint bgp-dynamic-routing
Co-Authored-By: vikram.choudhary <vikram.choudhary@huawei.com>
Change-Id: Ide76f9fdfba228000ddfa41a1168e77d21e92d9f
This commit is contained in:
vikram.choudhary 2016-02-14 16:51:00 +05:30 committed by Ryan Tidwell
parent 1c9e439e7a
commit 8bf46be67c
5 changed files with 248 additions and 14 deletions

View File

@ -876,13 +876,10 @@ class BgpDbMixin(common_db.CommonDbMixin):
query = query.filter(
BgpSpeakerNetworkBinding.network_id == network_id,
BgpSpeakerNetworkBinding.bgp_speaker_id == BgpSpeaker.id)
try:
return query.all()
except sa_exc.NoResultFound:
raise bgp_ext.NetworkNotBound(network_id=network_id)
return query.all()
def _bgp_speaker_for_gateway_network(self, context,
network_id, ip_version):
def _bgp_speakers_for_gw_network_by_family(self, context,
network_id, ip_version):
"""Return the BgpSpeaker by given gateway network and ip_version"""
with context.session.begin(subtransactions=True):
query = context.session.query(BgpSpeaker)
@ -890,12 +887,7 @@ class BgpDbMixin(common_db.CommonDbMixin):
BgpSpeakerNetworkBinding.network_id == network_id,
BgpSpeakerNetworkBinding.bgp_speaker_id == BgpSpeaker.id,
BgpSpeakerNetworkBinding.ip_version == ip_version)
try:
return query.one()
except sa_exc.NoResultFound:
raise bgp_ext.NetworkNotBoundForIpVersion(
network_id=network_id,
ip_version=ip_version)
return query.all()
def _make_advertised_routes_list(self, routes):
route_list = ({'destination': x,

View File

@ -358,6 +358,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
admin_ctx, {'router_id': [router_id]}):
raise l3.RouterExternalGatewayInUseByFloatingIp(
router_id=router_id, net_id=router.gw_port['network_id'])
gw_ips = [x['ip_address'] for x in router.gw_port.fixed_ips]
with context.session.begin(subtransactions=True):
gw_port = router.gw_port
router.gw_port = None
@ -369,7 +370,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
registry.notify(resources.ROUTER_GATEWAY,
events.AFTER_DELETE, self,
router_id=router_id,
network_id=old_network_id)
network_id=old_network_id,
gateway_ips=gw_ips)
def _check_router_gw_port_in_use(self, context, router_id):
try:
@ -819,15 +821,18 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
context, router_id, subnet_id, device_owner)
gw_network_id = None
gw_ips = []
router = self._get_router(context, router_id)
if router.gw_port:
gw_network_id = router.gw_port.network_id
gw_ips = [x['ip_address'] for x in router.gw_port.fixed_ips]
registry.notify(resources.ROUTER_INTERFACE,
events.AFTER_DELETE,
self,
cidrs=[x['cidr'] for x in subnets],
network_id=gw_network_id)
network_id=gw_network_id,
gateway_ips=gw_ips)
return self._make_router_interface_info(router_id, port['tenant_id'],
port['id'], port['network_id'],
subnets[0]['id'],

View File

@ -302,6 +302,18 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin,
subnet['id'], [subnet['id']])
self.notify_router_interface_action(
context, router_interface_info, 'add')
if router.gw_port:
gw_network_id = router.gw_port.network_id
gw_ips = [x['ip_address'] for x in router.gw_port.fixed_ips]
registry.notify(resources.ROUTER_INTERFACE,
events.AFTER_CREATE,
self,
network_id=gw_network_id,
gateway_ips=gw_ips,
cidrs=[x['cidr'] for x in subnets],
port_id=port['id'],
router_id=router_id,
interface_info=interface_info)
return router_interface_info
def _port_has_ipv6_address(self, port, csnat_port_check=True):

View File

@ -12,13 +12,19 @@
# License for the specific language governing permissions and limitations
# under the License.
from netaddr import IPAddress
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from neutron.api.rpc.agentnotifiers import bgp_dr_rpc_agent_api
from neutron.api.rpc.handlers import bgp_speaker_rpc as bs_rpc
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import constants as n_const
from neutron.common import rpc as n_rpc
from neutron import context
from neutron.db import bgp_db
from neutron.db import bgp_dragentscheduler_db
from neutron.extensions import bgp as bgp_ext
@ -42,6 +48,7 @@ class BgpPlugin(service_base.ServicePluginBase,
self.bgp_drscheduler = importutils.import_object(
cfg.CONF.bgp_drscheduler_driver)
self._setup_rpc()
self._register_callbacks()
def get_plugin_name(self):
return PLUGIN_NAME
@ -66,6 +73,41 @@ class BgpPlugin(service_base.ServicePluginBase,
fanout=False)
self.conn.consume_in_threads()
def _register_callbacks(self):
registry.subscribe(self.floatingip_update_callback,
resources.FLOATING_IP,
events.AFTER_UPDATE)
registry.subscribe(self.router_interface_callback,
resources.ROUTER_INTERFACE,
events.AFTER_CREATE)
registry.subscribe(self.router_interface_callback,
resources.ROUTER_INTERFACE,
events.BEFORE_CREATE)
registry.subscribe(self.router_interface_callback,
resources.ROUTER_INTERFACE,
events.AFTER_DELETE)
registry.subscribe(self.router_gateway_callback,
resources.ROUTER_GATEWAY,
events.AFTER_CREATE)
registry.subscribe(self.router_gateway_callback,
resources.ROUTER_GATEWAY,
events.AFTER_DELETE)
def create_bgp_speaker(self, context, bgp_speaker):
bgp_speaker = super(BgpPlugin, self).create_bgp_speaker(context,
bgp_speaker)
return bgp_speaker
def delete_bgp_speaker(self, context, bgp_speaker_id):
hosted_bgp_dragents = self.get_dragents_hosting_bgp_speakers(
context,
[bgp_speaker_id])
super(BgpPlugin, self).delete_bgp_speaker(context, bgp_speaker_id)
for agent in hosted_bgp_dragents:
self._bgp_rpc.bgp_speaker_removed(context,
bgp_speaker_id,
agent.host)
def add_bgp_peer(self, context, bgp_speaker_id, bgp_peer_info):
ret_value = super(BgpPlugin, self).add_bgp_peer(context,
bgp_speaker_id,
@ -92,3 +134,156 @@ class BgpPlugin(service_base.ServicePluginBase,
bgp_speaker_id,
ret_value['bgp_peer_id'],
agent.host)
def floatingip_update_callback(self, resource, event, trigger, **kwargs):
if event != events.AFTER_UPDATE:
return
ctx = context.get_admin_context()
new_router_id = kwargs['router_id']
last_router_id = kwargs['last_known_router_id']
next_hop = kwargs['next_hop']
dest = kwargs['floating_ip_address'] + '/32'
bgp_speakers = self._bgp_speakers_for_gw_network_by_family(
ctx,
kwargs['floating_network_id'],
n_const.IP_VERSION_4)
if last_router_id and new_router_id != last_router_id:
for bgp_speaker in bgp_speakers:
self.stop_route_advertisements(ctx, self._bgp_rpc,
bgp_speaker.id, [dest])
if next_hop and new_router_id != last_router_id:
new_host_route = {'destination': dest, 'next_hop': next_hop}
for bgp_speaker in bgp_speakers:
self.start_route_advertisements(ctx, self._bgp_rpc,
bgp_speaker.id,
[new_host_route])
def router_interface_callback(self, resource, event, trigger, **kwargs):
if event == events.AFTER_CREATE:
self._handle_router_interface_after_create(**kwargs)
if event == events.AFTER_DELETE:
gw_network = kwargs['network_id']
next_hops = self._next_hops_from_gateway_ips(
kwargs['gateway_ips'])
ctx = context.get_admin_context()
speakers = self._bgp_speakers_for_gateway_network(ctx, gw_network)
for speaker in speakers:
routes = self._route_list_from_prefixes_and_next_hop(
kwargs['cidrs'],
next_hops[speaker.ip_version])
self._handle_router_interface_after_delete(gw_network, routes)
def _handle_router_interface_after_create(self, **kwargs):
gw_network = kwargs['network_id']
if not gw_network:
return
ctx = context.get_admin_context()
with ctx.session.begin(subtransactions=True):
speakers = self._bgp_speakers_for_gateway_network(ctx,
gw_network)
next_hops = self._next_hops_from_gateway_ips(
kwargs['gateway_ips'])
for speaker in speakers:
prefixes = self._tenant_prefixes_by_router(
ctx,
kwargs['router_id'],
speaker.id)
next_hop = next_hops.get(speaker.ip_version)
if next_hop:
rl = self._route_list_from_prefixes_and_next_hop(prefixes,
next_hop)
self.start_route_advertisements(ctx,
self._bgp_rpc,
speaker.id,
rl)
def router_gateway_callback(self, resource, event, trigger, **kwargs):
if event == events.AFTER_CREATE:
self._handle_router_gateway_after_create(**kwargs)
if event == events.AFTER_DELETE:
gw_network = kwargs['network_id']
router_id = kwargs['router_id']
next_hops = self._next_hops_from_gateway_ips(
kwargs['gateway_ips'])
ctx = context.get_admin_context()
speakers = self._bgp_speakers_for_gateway_network(ctx, gw_network)
for speaker in speakers:
if speaker.ip_version in next_hops:
next_hop = next_hops[speaker.ip_version]
prefixes = self._tenant_prefixes_by_router(ctx,
router_id,
speaker.id)
routes = self._route_list_from_prefixes_and_next_hop(
prefixes,
next_hop)
self._handle_router_interface_after_delete(gw_network, routes)
def _handle_router_gateway_after_create(self, **kwargs):
ctx = context.get_admin_context()
gw_network = kwargs['network_id']
router_id = kwargs['router_id']
with ctx.session.begin(subtransactions=True):
speakers = self._bgp_speakers_for_gateway_network(ctx,
gw_network)
next_hops = self._next_hops_from_gateway_ips(kwargs['gw_ips'])
for speaker in speakers:
if speaker.ip_version in next_hops:
next_hop = next_hops[speaker.ip_version]
prefixes = self._tenant_prefixes_by_router(ctx,
router_id,
speaker.id)
routes = self._route_list_from_prefixes_and_next_hop(
prefixes,
next_hop)
self.start_route_advertisements(ctx, self._bgp_rpc,
speaker.id, routes)
def _handle_router_interface_after_delete(self, gw_network, routes):
if gw_network and routes:
ctx = context.get_admin_context()
speakers = self._bgp_speakers_for_gateway_network(ctx, gw_network)
for speaker in speakers:
self.stop_route_advertisements(ctx, self._bgp_rpc,
speaker.id, routes)
def _next_hops_from_gateway_ips(self, gw_ips):
if gw_ips:
return {IPAddress(ip).version: ip for ip in gw_ips}
return {}
def start_route_advertisements(self, ctx, bgp_rpc,
bgp_speaker_id, routes):
agents = self.list_dragent_hosting_bgp_speaker(ctx, bgp_speaker_id)
for agent in agents['agents']:
bgp_rpc.bgp_routes_advertisement(ctx,
bgp_speaker_id,
routes,
agent['host'])
msg = "Starting route advertisements for %s on BgpSpeaker %s"
self._debug_log_for_routes(msg, routes, bgp_speaker_id)
def stop_route_advertisements(self, ctx, bgp_rpc,
bgp_speaker_id, routes):
agents = self.list_dragent_hosting_bgp_speaker(ctx, bgp_speaker_id)
for agent in agents['agents']:
bgp_rpc.bgp_routes_withdrawal(ctx,
bgp_speaker_id,
routes,
agent['host'])
msg = "Stopping route advertisements for %s on BgpSpeaker %s"
self._debug_log_for_routes(msg, routes, bgp_speaker_id)
def _debug_log_for_routes(self, msg, routes, bgp_speaker_id):
# Could have a large number of routes passed, check log level first
if LOG.isEnabledFor(logging.DEBUG):
for route in routes:
LOG.debug(msg, route, bgp_speaker_id)

View File

@ -828,3 +828,33 @@ class BgpTests(test_plugin.Ml2PluginV2TestCase,
fip_prefix_found = True
self.assertTrue(tenant_prefix_found)
self.assertTrue(fip_prefix_found)
def test__bgp_speakers_for_gateway_network_by_ip_version(self):
with self.network() as ext_net, self.bgp_speaker(6, 1234) as s1,\
self.bgp_speaker(6, 4321) as s2:
gw_net_id = ext_net['network']['id']
self._update('networks', gw_net_id,
{'network': {external_net.EXTERNAL: True}})
self.bgp_plugin.add_gateway_network(self.context,
s1['id'],
{'network_id': gw_net_id})
self.bgp_plugin.add_gateway_network(self.context,
s2['id'],
{'network_id': gw_net_id})
speakers = self.bgp_plugin._bgp_speakers_for_gw_network_by_family(
self.context,
gw_net_id,
6)
self.assertEqual(2, len(speakers))
def test__bgp_speakers_for_gateway_network_by_ip_version_no_binding(self):
with self.network() as ext_net, self.bgp_speaker(6, 1234),\
self.bgp_speaker(6, 4321):
gw_net_id = ext_net['network']['id']
self._update('networks', gw_net_id,
{'network': {external_net.EXTERNAL: True}})
speakers = self.bgp_plugin._bgp_speakers_for_gw_network_by_family(
self.context,
gw_net_id,
6)
self.assertTrue(not speakers)