Merge "Add BGP Callback and agent RPC notifcation implementations"

This commit is contained in:
Jenkins 2016-03-09 03:45:25 +00:00 committed by Gerrit Code Review
commit 54a79091e9
3 changed files with 229 additions and 12 deletions

View File

@ -876,13 +876,10 @@ class BgpDbMixin(common_db.CommonDbMixin):
query = query.filter(
BgpSpeakerNetworkBinding.network_id == network_id,
BgpSpeakerNetworkBinding.bgp_speaker_id == BgpSpeaker.id)
try:
return query.all()
except sa_exc.NoResultFound:
raise bgp_ext.NetworkNotBound(network_id=network_id)
return query.all()
def _bgp_speaker_for_gateway_network(self, context,
network_id, ip_version):
def _bgp_speakers_for_gw_network_by_family(self, context,
network_id, ip_version):
"""Return the BgpSpeaker by given gateway network and ip_version"""
with context.session.begin(subtransactions=True):
query = context.session.query(BgpSpeaker)
@ -890,12 +887,7 @@ class BgpDbMixin(common_db.CommonDbMixin):
BgpSpeakerNetworkBinding.network_id == network_id,
BgpSpeakerNetworkBinding.bgp_speaker_id == BgpSpeaker.id,
BgpSpeakerNetworkBinding.ip_version == ip_version)
try:
return query.one()
except sa_exc.NoResultFound:
raise bgp_ext.NetworkNotBoundForIpVersion(
network_id=network_id,
ip_version=ip_version)
return query.all()
def _make_advertised_routes_list(self, routes):
route_list = ({'destination': x,

View File

@ -12,13 +12,19 @@
# License for the specific language governing permissions and limitations
# under the License.
from netaddr import IPAddress
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from neutron.api.rpc.agentnotifiers import bgp_dr_rpc_agent_api
from neutron.api.rpc.handlers import bgp_speaker_rpc as bs_rpc
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import constants as n_const
from neutron.common import rpc as n_rpc
from neutron import context
from neutron.db import bgp_db
from neutron.db import bgp_dragentscheduler_db
from neutron.extensions import bgp as bgp_ext
@ -42,6 +48,7 @@ class BgpPlugin(service_base.ServicePluginBase,
self.bgp_drscheduler = importutils.import_object(
cfg.CONF.bgp_drscheduler_driver)
self._setup_rpc()
self._register_callbacks()
def get_plugin_name(self):
return PLUGIN_NAME
@ -66,6 +73,41 @@ class BgpPlugin(service_base.ServicePluginBase,
fanout=False)
self.conn.consume_in_threads()
def _register_callbacks(self):
registry.subscribe(self.floatingip_update_callback,
resources.FLOATING_IP,
events.AFTER_UPDATE)
registry.subscribe(self.router_interface_callback,
resources.ROUTER_INTERFACE,
events.AFTER_CREATE)
registry.subscribe(self.router_interface_callback,
resources.ROUTER_INTERFACE,
events.BEFORE_CREATE)
registry.subscribe(self.router_interface_callback,
resources.ROUTER_INTERFACE,
events.AFTER_DELETE)
registry.subscribe(self.router_gateway_callback,
resources.ROUTER_GATEWAY,
events.AFTER_CREATE)
registry.subscribe(self.router_gateway_callback,
resources.ROUTER_GATEWAY,
events.AFTER_DELETE)
def create_bgp_speaker(self, context, bgp_speaker):
bgp_speaker = super(BgpPlugin, self).create_bgp_speaker(context,
bgp_speaker)
return bgp_speaker
def delete_bgp_speaker(self, context, bgp_speaker_id):
hosted_bgp_dragents = self.get_dragents_hosting_bgp_speakers(
context,
[bgp_speaker_id])
super(BgpPlugin, self).delete_bgp_speaker(context, bgp_speaker_id)
for agent in hosted_bgp_dragents:
self._bgp_rpc.bgp_speaker_removed(context,
bgp_speaker_id,
agent.host)
def add_bgp_peer(self, context, bgp_speaker_id, bgp_peer_info):
ret_value = super(BgpPlugin, self).add_bgp_peer(context,
bgp_speaker_id,
@ -92,3 +134,156 @@ class BgpPlugin(service_base.ServicePluginBase,
bgp_speaker_id,
ret_value['bgp_peer_id'],
agent.host)
def floatingip_update_callback(self, resource, event, trigger, **kwargs):
if event != events.AFTER_UPDATE:
return
ctx = context.get_admin_context()
new_router_id = kwargs['router_id']
last_router_id = kwargs['last_known_router_id']
next_hop = kwargs['next_hop']
dest = kwargs['floating_ip_address'] + '/32'
bgp_speakers = self._bgp_speakers_for_gw_network_by_family(
ctx,
kwargs['floating_network_id'],
n_const.IP_VERSION_4)
if last_router_id and new_router_id != last_router_id:
for bgp_speaker in bgp_speakers:
self.stop_route_advertisements(ctx, self._bgp_rpc,
bgp_speaker.id, [dest])
if next_hop and new_router_id != last_router_id:
new_host_route = {'destination': dest, 'next_hop': next_hop}
for bgp_speaker in bgp_speakers:
self.start_route_advertisements(ctx, self._bgp_rpc,
bgp_speaker.id,
[new_host_route])
def router_interface_callback(self, resource, event, trigger, **kwargs):
if event == events.AFTER_CREATE:
self._handle_router_interface_after_create(**kwargs)
if event == events.AFTER_DELETE:
gw_network = kwargs['network_id']
next_hops = self._next_hops_from_gateway_ips(
kwargs['gateway_ips'])
ctx = context.get_admin_context()
speakers = self._bgp_speakers_for_gateway_network(ctx, gw_network)
for speaker in speakers:
routes = self._route_list_from_prefixes_and_next_hop(
kwargs['cidrs'],
next_hops[speaker.ip_version])
self._handle_router_interface_after_delete(gw_network, routes)
def _handle_router_interface_after_create(self, **kwargs):
gw_network = kwargs['network_id']
if not gw_network:
return
ctx = context.get_admin_context()
with ctx.session.begin(subtransactions=True):
speakers = self._bgp_speakers_for_gateway_network(ctx,
gw_network)
next_hops = self._next_hops_from_gateway_ips(
kwargs['gateway_ips'])
for speaker in speakers:
prefixes = self._tenant_prefixes_by_router(
ctx,
kwargs['router_id'],
speaker.id)
next_hop = next_hops.get(speaker.ip_version)
if next_hop:
rl = self._route_list_from_prefixes_and_next_hop(prefixes,
next_hop)
self.start_route_advertisements(ctx,
self._bgp_rpc,
speaker.id,
rl)
def router_gateway_callback(self, resource, event, trigger, **kwargs):
if event == events.AFTER_CREATE:
self._handle_router_gateway_after_create(**kwargs)
if event == events.AFTER_DELETE:
gw_network = kwargs['network_id']
router_id = kwargs['router_id']
next_hops = self._next_hops_from_gateway_ips(
kwargs['gateway_ips'])
ctx = context.get_admin_context()
speakers = self._bgp_speakers_for_gateway_network(ctx, gw_network)
for speaker in speakers:
if speaker.ip_version in next_hops:
next_hop = next_hops[speaker.ip_version]
prefixes = self._tenant_prefixes_by_router(ctx,
router_id,
speaker.id)
routes = self._route_list_from_prefixes_and_next_hop(
prefixes,
next_hop)
self._handle_router_interface_after_delete(gw_network, routes)
def _handle_router_gateway_after_create(self, **kwargs):
ctx = context.get_admin_context()
gw_network = kwargs['network_id']
router_id = kwargs['router_id']
with ctx.session.begin(subtransactions=True):
speakers = self._bgp_speakers_for_gateway_network(ctx,
gw_network)
next_hops = self._next_hops_from_gateway_ips(kwargs['gw_ips'])
for speaker in speakers:
if speaker.ip_version in next_hops:
next_hop = next_hops[speaker.ip_version]
prefixes = self._tenant_prefixes_by_router(ctx,
router_id,
speaker.id)
routes = self._route_list_from_prefixes_and_next_hop(
prefixes,
next_hop)
self.start_route_advertisements(ctx, self._bgp_rpc,
speaker.id, routes)
def _handle_router_interface_after_delete(self, gw_network, routes):
if gw_network and routes:
ctx = context.get_admin_context()
speakers = self._bgp_speakers_for_gateway_network(ctx, gw_network)
for speaker in speakers:
self.stop_route_advertisements(ctx, self._bgp_rpc,
speaker.id, routes)
def _next_hops_from_gateway_ips(self, gw_ips):
if gw_ips:
return {IPAddress(ip).version: ip for ip in gw_ips}
return {}
def start_route_advertisements(self, ctx, bgp_rpc,
bgp_speaker_id, routes):
agents = self.list_dragent_hosting_bgp_speaker(ctx, bgp_speaker_id)
for agent in agents['agents']:
bgp_rpc.bgp_routes_advertisement(ctx,
bgp_speaker_id,
routes,
agent['host'])
msg = "Starting route advertisements for %s on BgpSpeaker %s"
self._debug_log_for_routes(msg, routes, bgp_speaker_id)
def stop_route_advertisements(self, ctx, bgp_rpc,
bgp_speaker_id, routes):
agents = self.list_dragent_hosting_bgp_speaker(ctx, bgp_speaker_id)
for agent in agents['agents']:
bgp_rpc.bgp_routes_withdrawal(ctx,
bgp_speaker_id,
routes,
agent['host'])
msg = "Stopping route advertisements for %s on BgpSpeaker %s"
self._debug_log_for_routes(msg, routes, bgp_speaker_id)
def _debug_log_for_routes(self, msg, routes, bgp_speaker_id):
# Could have a large number of routes passed, check log level first
if LOG.isEnabledFor(logging.DEBUG):
for route in routes:
LOG.debug(msg, route, bgp_speaker_id)

View File

@ -828,3 +828,33 @@ class BgpTests(test_plugin.Ml2PluginV2TestCase,
fip_prefix_found = True
self.assertTrue(tenant_prefix_found)
self.assertTrue(fip_prefix_found)
def test__bgp_speakers_for_gateway_network_by_ip_version(self):
with self.network() as ext_net, self.bgp_speaker(6, 1234) as s1,\
self.bgp_speaker(6, 4321) as s2:
gw_net_id = ext_net['network']['id']
self._update('networks', gw_net_id,
{'network': {external_net.EXTERNAL: True}})
self.bgp_plugin.add_gateway_network(self.context,
s1['id'],
{'network_id': gw_net_id})
self.bgp_plugin.add_gateway_network(self.context,
s2['id'],
{'network_id': gw_net_id})
speakers = self.bgp_plugin._bgp_speakers_for_gw_network_by_family(
self.context,
gw_net_id,
6)
self.assertEqual(2, len(speakers))
def test__bgp_speakers_for_gateway_network_by_ip_version_no_binding(self):
with self.network() as ext_net, self.bgp_speaker(6, 1234),\
self.bgp_speaker(6, 4321):
gw_net_id = ext_net['network']['id']
self._update('networks', gw_net_id,
{'network': {external_net.EXTERNAL: True}})
speakers = self.bgp_plugin._bgp_speakers_for_gw_network_by_family(
self.context,
gw_net_id,
6)
self.assertTrue(not speakers)