[AIM] Add APIC topology RPC listener

RPC listeners are implemented to handle topology
change events (update_link, delete_link) reported
by agents on hosts. This host-link information
is saved to AIM HostLink resource. Also the static
path binding of EPGs that have ports on
the reporting host is also updated to account
for the addition/deletion of the host-link.

Change-Id: I9e23628d824829c77abcf424fb27493602ed469a
Signed-off-by: Amit Bose <amitbose@gmail.com>
(cherry picked from commit 6a4670ba52)
This commit is contained in:
Amit Bose
2016-12-12 15:44:22 -08:00
parent 467dda26c6
commit 204ffe8500
2 changed files with 230 additions and 21 deletions

View File

@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
import sqlalchemy as sa
from aim.aim_lib import nat_strategy
@@ -28,6 +29,7 @@ from neutron._i18n import _LW
from neutron.api.v2 import attributes
from neutron.common import constants as n_constants
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics as n_topics
from neutron.db import address_scope_db
from neutron.db import api as db_api
@@ -36,19 +38,22 @@ from neutron.db import models_v2
from neutron.extensions import portbindings
from neutron import manager
from neutron.plugins.common import constants as pconst
from neutron.plugins.ml2 import db
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2 import models
from opflexagent import constants as ofcst
from opflexagent import rpc as ofrpc
from oslo_log import log
import oslo_messaging
from apic_ml2.neutron.plugins.ml2.drivers.cisco.apic import (rpc as
apic_topo_rpc)
from gbpservice.neutron.extensions import cisco_apic
from gbpservice.neutron.extensions import cisco_apic_l3 as a_l3
from gbpservice.neutron.plugins.ml2plus import driver_api as api_plus
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import apic_mapper
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import cache
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import extension_db
from oslo_serialization.jsonutils import netaddr
LOG = log.getLogger(__name__)
@@ -94,6 +99,18 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
# TODO(rkukura): Derivations of tenant_aname throughout need to
# take sharing into account.
class TopologyRpcEndpoint(object):
target = oslo_messaging.Target(version='1.2')
def __init__(self, mechanism_driver):
self.md = mechanism_driver
def update_link(self, *args, **kwargs):
self.md.update_link(*args, **kwargs)
def delete_link(self, *args, **kwargs):
self.md.delete_link(*args, **kwargs)
def __init__(self):
LOG.info(_LI("APIC AIM MD __init__"))
@@ -115,6 +132,12 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
self.ap_name = self.aim_cfg_mgr.get_option_and_subscribe(
self._set_ap_name, 'apic_app_profile_name', 'apic')
self.notifier = ofrpc.AgentNotifierApi(n_topics.AGENT)
# setup APIC topology RPC handler
self.topology_conn = n_rpc.create_connection(new=True)
self.topology_conn.create_consumer(apic_topo_rpc.TOPIC_APIC_SERVICE,
[self.TopologyRpcEndpoint(self)],
fanout=False)
self.topology_conn.consume_in_threads()
def ensure_tenant(self, plugin_context, tenant_id):
LOG.debug("APIC AIM MD ensuring tenant_id: %s", tenant_id)
@@ -956,6 +979,68 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
if current['port_id']:
self._notify_port_update(context, current['port_id'])
# Topology RPC method handler
def update_link(self, context, host, interface, mac,
switch, module, port, port_description=''):
LOG.debug('Topology RPC: update_link: %s',
', '.join([str(p) for p in
(host, interface, mac, switch, module, port,
port_description)]))
if not switch:
self.delete_link(context, host, interface, mac, switch, module,
port)
return
session = context.session
aim_ctx = aim_context.AimContext(db_session=session)
hlink = self.aim.get(aim_ctx,
aim_infra.HostLink(host_name=host,
interface_name=interface))
if not hlink or hlink.path != port_description:
attrs = dict(interface_mac=mac,
switch_id=switch, module=module, port=port,
path=port_description)
if hlink:
old_path = hlink.path
hlink = self.aim.update(aim_ctx, hlink, **attrs)
else:
old_path = None
hlink = aim_infra.HostLink(host_name=host,
interface_name=interface,
**attrs)
hlink = self.aim.create(aim_ctx, hlink)
# Update static paths of all EPGs with ports on the host
nets_segs = self._get_non_opflex_segments_on_host(context, host)
for net, seg in nets_segs:
self._update_static_path_for_network(session, net, seg,
old_path=old_path,
new_path=hlink.path)
# Topology RPC method handler
def delete_link(self, context, host, interface, mac, switch, module, port):
LOG.debug('Topology RPC: delete_link: %s',
', '.join([str(p) for p in
(host, interface, mac, switch, module, port)]))
session = context.session
aim_ctx = aim_context.AimContext(db_session=session)
hlink = self.aim.get(aim_ctx,
aim_infra.HostLink(host_name=host,
interface_name=interface))
if not hlink:
return
self.aim.delete(aim_ctx, hlink)
# if there are no more host-links for this host (multiple links may
# exist with VPC), update EPGs with ports on this host to remove
# the static path to this host
if not self.aim.find(aim_ctx, aim_infra.HostLink, host_name=host,
path=hlink.path):
nets_segs = self._get_non_opflex_segments_on_host(context, host)
for net, seg in nets_segs:
self._update_static_path_for_network(session, net, seg,
old_path=hlink.path)
def _agent_bind_port(self, context, agent_type, bind_strategy):
current = context.current
for agent in context.host_agents(agent_type):
@@ -1560,6 +1645,38 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
self._is_supported_non_opflex_type(
bound_segment[api.NETWORK_TYPE]))
def _update_static_path_for_network(self, session, network, segment,
old_path=None, new_path=None):
if new_path and not segment:
return
epg = self._map_network_to_epg(session, network)
if not epg:
LOG.info(_LI('Network %s does not map to any EPG'), network['id'])
return
if segment:
if segment.get(api.NETWORK_TYPE) in [pconst.TYPE_VLAN]:
seg = 'vlan-%s' % segment[api.SEGMENTATION_ID]
else:
LOG.debug('Unsupported segmentation type for static path '
'binding: %s',
segment.get(api.NETWORK_TYPE))
return
aim_ctx = aim_context.AimContext(db_session=session)
epg = self.aim.get(aim_ctx, epg)
to_remove = [old_path] if old_path else []
to_remove.extend([new_path] if new_path else [])
if to_remove:
epg.static_paths = [p for p in epg.static_paths
if p.get('path') not in to_remove]
if new_path:
epg.static_paths.append({'path': new_path, 'encap': seg})
LOG.debug('Setting static paths for EPG %s to %s',
epg, epg.static_paths)
self.aim.update(aim_ctx, epg, static_paths=epg.static_paths)
def _update_static_path(self, port_context, host=None, segment=None,
remove=False):
host = host or port_context.host
@@ -1579,14 +1696,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
.first())
if exist:
return
else:
if (segment.get(api.NETWORK_TYPE) in [pconst.TYPE_VLAN]):
seg = segment[api.SEGMENTATION_ID]
else:
LOG.info(_LI('Unsupported segmentation type for static path '
'binding: %s'),
segment.get(api.NETWORK_TYPE))
return
aim_ctx = aim_context.AimContext(db_session=session)
host_link = self.aim.find(aim_ctx, aim_infra.HostLink, host_name=host)
@@ -1596,18 +1705,9 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
return
host_link = host_link[0].path
epg = self._map_network_to_epg(session, port_context.network.current)
if not epg:
LOG.info(_LI('Network %s does not map to any EPG'),
port_context.network.current['id'])
return
epg = self.aim.get(aim_ctx, epg)
static_paths = [p for p in epg.static_paths
if p.get('path') != host_link]
if not remove:
static_paths.append({'path': host_link, 'encap': 'vlan-%s' % seg})
LOG.debug('Setting static paths for EPG %s to %s', epg, static_paths)
self.aim.update(aim_ctx, epg, static_paths=static_paths)
self._update_static_path_for_network(
session, port_context.network.current, segment,
**{'old_path' if remove else 'new_path': host_link})
def _release_dynamic_segment(self, port_context, use_original=False):
top = (port_context.original_top_bound_segment if use_original
@@ -1628,3 +1728,19 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
LOG.info(_LI('Releasing dynamic-segment %(s)s for port %(p)s'),
{'s': btm, 'p': port_context.current['id']})
port_context.release_dynamic_segment(btm[api.ID])
def _get_non_opflex_segments_on_host(self, context, host):
session = context.session
segments = (session.query(models.NetworkSegment)
.join(models.PortBindingLevel)
.filter_by(host=host)
.all())
net_ids = set([])
result = []
for seg in segments:
if (self._is_supported_non_opflex_type(seg[api.NETWORK_TYPE]) and
seg.network_id not in net_ids):
net = self.plugin.get_network(context, seg.network_id)
result.append((net, db._make_segment_dict(seg)))
net_ids.add(seg.network_id)
return result

View File

@@ -2559,6 +2559,99 @@ class TestPortVlanNetwork(ApicAimTestCase):
epg1 = self.aim_mgr.get(aim_ctx, epg1)
self.assertEqual([], epg1.static_paths)
def test_topology_rpc_no_ports(self):
nctx = context.get_admin_context()
aim_ctx = aim_context.AimContext(self.db_session)
net1 = self._make_network(self.fmt, 'net1', True)['network']
epg1 = self._net_2_epg(net1)
# add hostlink for h10
self.driver.update_link(nctx, 'h10', 'eth0', 'A:A', 101, 1, 19,
'topology/pod-1/paths-101/pathep-[eth1/19]')
expected_hlink10 = aim_infra.HostLink(host_name='h10',
interface_name='eth0', interface_mac='A:A',
switch_id='101', module='1', port='19',
path='topology/pod-1/paths-101/pathep-[eth1/19]')
self.assertEqual(expected_hlink10,
self.aim_mgr.get(aim_ctx, expected_hlink10))
epg1 = self.aim_mgr.get(aim_ctx, epg1)
self.assertEqual([], epg1.static_paths)
# remove hostlink for h10
self.driver.delete_link(nctx, 'h10', 'eth0', 'A:A', 0, 0, 0)
self.assertIsNone(self.aim_mgr.get(aim_ctx, expected_hlink10))
epg1 = self.aim_mgr.get(aim_ctx, epg1)
self.assertEqual([], epg1.static_paths)
def test_topology_rpc(self):
nctx = context.get_admin_context()
aim_ctx = aim_context.AimContext(self.db_session)
epgs = []
vlans = []
self._register_agent('h10', AGENT_CONF_OVS)
for x in xrange(0, 2):
net = self._make_network(self.fmt, 'net%d' % x, True)['network']
epgs.append(self._net_2_epg(net))
with self.subnet(network={'network': net}) as sub:
with self.port(subnet=sub) as p:
p = self._bind_port_to_host(p['port']['id'], 'h10')
vlans.append(self._check_binding(p['port']['id']))
epgs[x] = self.aim_mgr.get(aim_ctx, epgs[x])
self.assertEqual([], epgs[x].static_paths)
def check_epg_static_paths(link_path):
for x in range(0, len(epgs)):
epgs[x] = self.aim_mgr.get(aim_ctx, epgs[x])
expected_path = ([{'path': link_path,
'encap': 'vlan-%s' % vlans[x]}]
if link_path else [])
self.assertEqual(expected_path, epgs[x].static_paths)
# add hostlink for h10
self.driver.update_link(nctx, 'h10', 'eth0', 'A:A', 101, 1, 19,
'topology/pod-1/paths-101/pathep-[eth1/19]')
expected_hlink10 = aim_infra.HostLink(host_name='h10',
interface_name='eth0', interface_mac='A:A',
switch_id='101', module='1', port='19',
path='topology/pod-1/paths-101/pathep-[eth1/19]')
self.assertEqual(expected_hlink10,
self.aim_mgr.get(aim_ctx, expected_hlink10))
check_epg_static_paths(expected_hlink10.path)
# update link
self.driver.update_link(nctx, 'h10', 'eth0', 'A:A', 101, 1, 42,
'topology/pod-1/paths-101/pathep-[eth1/42]')
expected_hlink10.port = '42'
expected_hlink10.path = 'topology/pod-1/paths-101/pathep-[eth1/42]'
self.assertEqual(expected_hlink10,
self.aim_mgr.get(aim_ctx, expected_hlink10))
check_epg_static_paths(expected_hlink10.path)
# add another link (VPC like)
self.driver.update_link(nctx, 'h10', 'eth1', 'B:B', 201, 1, 24,
'topology/pod-1/paths-101/pathep-[eth1/42]')
expected_hlink10_sec = aim_infra.HostLink(host_name='h10',
interface_name='eth1', interface_mac='B:B',
switch_id='201', module='1', port='24',
path='topology/pod-1/paths-101/pathep-[eth1/42]')
self.assertEqual(expected_hlink10_sec,
self.aim_mgr.get(aim_ctx, expected_hlink10_sec))
check_epg_static_paths(expected_hlink10.path)
# remove second link
self.driver.delete_link(nctx, 'h10', 'eth1', 'B:B', 0, 0, 0)
self.assertIsNone(self.aim_mgr.get(aim_ctx, expected_hlink10_sec))
check_epg_static_paths(expected_hlink10.path)
# remove first link
self.driver.update_link(nctx, 'h10', 'eth0', 'A:A', 0, 0, 0, '')
self.assertIsNone(self.aim_mgr.get(aim_ctx, expected_hlink10))
check_epg_static_paths(None)
class TestPortOnPhysicalNode(TestPortVlanNetwork):
# Tests for binding port on physical node where another ML2 mechanism