[AIM] Cleanup RPC implementations

Remove the original RPC implemenation from the aim_mapping policy
driver, remove the mechanism driver config option for selecting RPC
implementations, remove the UT variations for testing the non-default
RPC implementations, and remove the support code used only by the
removed RPC implementation.

Change-Id: I0274ce3debfb414edc02d5363b8e9dbcaccad63d
This commit is contained in:
Robert Kukura 2019-02-22 17:14:02 -05:00
parent da052cb15c
commit 0481773068
8 changed files with 64 additions and 937 deletions

View File

@ -55,11 +55,6 @@ apic_opts = [
"plugin, formatted as a dictionary mapping Neutron external "
"network IDs (UUIDs) to ACI external network distinguished "
"names."),
# REVISIT: Eliminate the following option, leaving a single RPC
# implementation.
cfg.BoolOpt('enable_new_rpc',
default=True,
help=("Enable new RPC handler.")),
cfg.IntOpt('apic_nova_vm_name_cache_update_interval', default=60,
help=("How many seconds for the polling thread on each "
"controller should wait before it updates the nova vm "

View File

@ -238,21 +238,6 @@ class DbMixin(object):
return query(session).params(
network_id=network_id).one_or_none()
# REVISIT: Remove with original RPC implementation.
def _get_network_mapping_bulk(self, session, network_ids):
# REVISIT: This method is not called during any UT, and does
# not appear to be referenced elsewhere in this repository.
if not network_ids:
return []
query = BAKERY(lambda s: s.query(
NetworkMapping))
query += lambda q: q.filter(
NetworkMapping.network_id.in_(
sa.bindparam('network_ids', expanding=True)))
return query(session).params(
network_ids=network_ids).all()
def _get_network_mappings_for_vrf(self, session, vrf):
query = BAKERY(lambda s: s.query(
NetworkMapping))
@ -263,17 +248,6 @@ class DbMixin(object):
vrf_tenant_name=vrf.tenant_name,
vrf_name=vrf.name).all()
# REVISIT: Remove with original RPC implementation.
def _get_network_mappings_for_bd(self, session, bd):
query = BAKERY(lambda s: s.query(
NetworkMapping))
query += lambda q: q.filter_by(
bd_tenant_name=sa.bindparam('bd_tenant_name'),
bd_name=sa.bindparam('bd_name'))
return query(session).params(
bd_tenant_name=bd.tenant_name,
bd_name=bd.name).all()
def _is_vrf_used_by_networks(self, session, vrf):
query = BAKERY(lambda s: s.query(
NetworkMapping.network_id))
@ -530,10 +504,3 @@ class DbMixin(object):
last_incremental_update_time=last_incremental_update_time,
last_full_update_time=last_full_update_time)
session.add(db_obj)
# REVISIT: Remove with original RPC implementation.
def _delete_vm_name_update(self, session):
with session.begin(subtransactions=True):
db_obj = self._get_vm_name_update(session)
if db_obj:
session.delete(db_obj)

View File

@ -237,9 +237,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.enable_iptables_firewall = (cfg.CONF.ml2_apic_aim.
enable_iptables_firewall)
self.l3_domain_dn = cfg.CONF.ml2_apic_aim.l3_domain_dn
# REVISIT: Eliminate the following property, leaving a single
# RPC implementation.
self.enable_new_rpc = cfg.CONF.ml2_apic_aim.enable_new_rpc
self.apic_nova_vm_name_cache_update_interval = (cfg.CONF.ml2_apic_aim.
apic_nova_vm_name_cache_update_interval)
self._setup_nova_vm_update()
@ -3325,16 +3322,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
mapping = self._get_network_mapping(session, network['id'])
return mapping and self._get_network_epg(mapping)
# REVISIT: Remove with original RPC implementation.
def get_vrf_for_network(self, session, network):
mapping = self._get_network_mapping(session, network['id'])
return mapping and self._get_network_vrf(mapping)
# REVISIT: Remove with original RPC implementation.
def get_network_ids_for_bd(self, session, bd):
mapping = self._get_network_mappings_for_bd(session, bd)
return [m.network_id for m in mapping]
def get_aim_domains(self, aim_ctx):
vmms = [{'name': x.name, 'type': x.type}
for x in self.aim.find(aim_ctx, aim_resource.VMMDomain)
@ -4383,11 +4370,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
return [p[0] for p in port_ids]
# REVISIT: Remove with original RPC implementation.
def _get_port_network_id(self, plugin_context, port_id):
port = self.plugin.get_port(plugin_context, port_id)
return port['network_id']
def _get_svi_default_external_epg(self, network):
if not network.get(cisco_apic.SVI):
return None

View File

@ -179,13 +179,12 @@ class ApicRpcHandlerMixin(object):
conn = n_rpc.create_connection()
# Opflex RPC handler.
if self.enable_new_rpc:
self._opflex_endpoint = o_rpc.GBPServerRpcCallback(
self, self.notifier)
conn.create_consumer(
o_rpc.TOPIC_OPFLEX,
[self._opflex_endpoint],
fanout=False)
self._opflex_endpoint = o_rpc.GBPServerRpcCallback(
self, self.notifier)
conn.create_consumer(
o_rpc.TOPIC_OPFLEX,
[self._opflex_endpoint],
fanout=False)
# Topology RPC hander.
self._topology_endpoint = TopologyRpcEndpoint(self)

View File

@ -11,7 +11,6 @@
# under the License.
import hashlib
import netaddr
import re
import six
import sqlalchemy as sa
@ -21,7 +20,6 @@ from aim import aim_manager
from aim.api import resource as aim_resource
from aim import context as aim_context
from aim import utils as aim_utils
from neutron.agent.linux import dhcp
from neutron.db import api as db_api
from neutron import policy
from neutron_lib import constants as n_constants
@ -42,8 +40,6 @@ from gbpservice.neutron.extensions import cisco_apic
from gbpservice.neutron.extensions import cisco_apic_gbp as aim_ext
from gbpservice.neutron.extensions import cisco_apic_l3
from gbpservice.neutron.extensions import group_policy as gpolicy
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import (
exceptions as md_exc)
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import (
mechanism_driver as md)
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import apic_mapper
@ -81,12 +77,6 @@ AUTO_PTG_ID_PREFIX = AUTO_PTG_PREFIX + '%s'
# Definitions duplicated from apicapi lib
APIC_OWNED = 'apic_owned_'
# REVISIT: Remove with original RPC implementation.
PROMISCUOUS_TYPES = [n_constants.DEVICE_OWNER_DHCP,
n_constants.DEVICE_OWNER_LOADBALANCER]
# REVISIT: Remove with original RPC implementation.
PROMISCUOUS_SUFFIX = 'promiscuous'
CONTRACTS = 'contracts'
CONTRACT_SUBJECTS = 'contract_subjects'
FILTERS = 'filters'
@ -161,9 +151,7 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
@log.log_method_call
def start_rpc_listeners(self):
if self.aim_mech_driver.enable_new_rpc:
return []
return self.setup_opflex_rpc_listeners()
return []
def validate_state(self, repair):
mgr = aim_validation.ValidationManager()
@ -1159,12 +1147,6 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
return self._get_aim_application_profile_from_db(
context._plugin_context.session, ap)
# REVISIT: Remove with original RPC implementation.
def _get_aim_application_profile(self, session, apg):
# This gets an AP from the AIM DB
ap = self._aim_application_profile(session, apg)
return self._get_aim_application_profile_from_db(session, ap)
def _get_aim_application_profile_from_db(self, session, ap):
aim_ctx = aim_context.AimContext(session)
ap_fetched = self.aim.get(aim_ctx, ap)
@ -1455,18 +1437,6 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
aim_contract, in_filters, out_filters, bi_filters)
self.aim.create(aim_ctx, aim_contract_subject, overwrite=True)
# REVISIT: Remove with original RPC implementation.
def _get_aim_contract(self, session, policy_rule_set):
# This gets a Contract from the AIM DB
aim_ctx = aim_context.AimContext(session)
contract = self._aim_contract(session, policy_rule_set)
contract_fetched = self.aim.get(aim_ctx, contract)
if not contract_fetched:
LOG.debug("No Contract found in AIM DB")
else:
LOG.debug("Got Contract: %s", vars(contract_fetched))
return contract_fetched
def _get_aim_contract_names(self, session, prs_id_list):
contract_list = []
for prs_id in prs_id_list:
@ -1782,36 +1752,6 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
session = context._plugin_context.session
return aim_context.AimContext(session)
# REVISIT: Remove with original RPC implementation.
def _is_port_promiscuous(self, plugin_context, port):
pt = self._port_id_to_pt(plugin_context, port['id'])
if (pt and pt.get('cluster_id') and
pt.get('cluster_id') != pt['id']):
master = self._get_policy_target(plugin_context, pt['cluster_id'])
if master.get('group_default_gateway'):
return True
if (port['device_owner'] in PROMISCUOUS_TYPES or
port['name'].endswith(PROMISCUOUS_SUFFIX) or
(pt and pt.get('group_default_gateway'))):
return True
if not port.get('port_security_enabled', True):
return True
return False
# REVISIT: Remove with original RPC implementation.
def _is_dhcp_optimized(self, plugin_context, port):
return self.aim_mech_driver.enable_dhcp_opt
# REVISIT: Remove with original RPC implementation.
def _is_metadata_optimized(self, plugin_context, port):
return self.aim_mech_driver.enable_metadata_opt
# REVISIT: Remove with original RPC implementation.
def _set_dhcp_lease_time(self, details):
if self.aim_mech_driver.apic_optimized_dhcp_lease_time > 0:
details['dhcp_lease_time'] = (
self.aim_mech_driver.apic_optimized_dhcp_lease_time)
# REVISIT: Called by mechanism driver when binding a port using
# DVS. Consider a more general way for neutron ports to be bound
# using a non-default EPG.
@ -1832,273 +1772,6 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
"port %s", port['id'])
return epg
# REVISIT: Remove with original RPC implementation.
def _get_subnet_details(self, plugin_context, port):
# L2P might not exist for a pure Neutron port
l2p = self._network_id_to_l2p(plugin_context, port['network_id'])
# TODO(ivar): support shadow network
# if not l2p and self._ptg_needs_shadow_network(context, ptg):
# l2p = self._get_l2_policy(context._plugin_context,
# ptg['l2_policy_id'])
subnets = self._get_subnets(
plugin_context,
filters={'id': [ip['subnet_id'] for ip in port['fixed_ips']]})
for subnet in subnets:
dhcp_ports = {}
subnet_dhcp_ips = set()
for dhcp_port in self._get_ports(
plugin_context,
filters={
'network_id': [subnet['network_id']],
'device_owner': [n_constants.DEVICE_OWNER_DHCP]}):
dhcp_ips = set([x['ip_address'] for x in dhcp_port['fixed_ips']
if x['subnet_id'] == subnet['id']])
dhcp_ports.setdefault(dhcp_port['mac_address'], list(dhcp_ips))
subnet_dhcp_ips |= dhcp_ips
subnet_dhcp_ips = list(subnet_dhcp_ips)
if not subnet['dns_nameservers']:
# Use DHCP namespace port IP
subnet['dns_nameservers'] = subnet_dhcp_ips
# Set Default & Metadata routes if needed
default_route = metadata_route = {}
if subnet['ip_version'] == 4:
for route in subnet['host_routes']:
if route['destination'] == '0.0.0.0/0':
default_route = route
if route['destination'] == dhcp.METADATA_DEFAULT_CIDR:
metadata_route = route
if l2p and not l2p['inject_default_route']:
# In this case we do not want to send the default route
# and the metadata route. We also do not want to send
# the gateway_ip for the subnet.
if default_route:
subnet['host_routes'].remove(default_route)
if metadata_route:
subnet['host_routes'].remove(metadata_route)
del subnet['gateway_ip']
else:
# Set missing routes
if not default_route:
subnet['host_routes'].append(
{'destination': '0.0.0.0/0',
'nexthop': subnet['gateway_ip']})
optimized = self._is_metadata_optimized(plugin_context,
port)
# REVISIT: We need to decide if we should provide
# host-routes for all of the DHCP agents. For now
# use the first DHCP agent in our list for the
# metadata host-route next-hop IPs
if not metadata_route and dhcp_ports and (
not optimized or (optimized and not default_route)):
for ip in dhcp_ports[dhcp_ports.keys()[0]]:
subnet['host_routes'].append(
{'destination': dhcp.METADATA_DEFAULT_CIDR,
'nexthop': ip})
subnet['dhcp_server_ips'] = subnet_dhcp_ips
if dhcp_ports:
subnet['dhcp_server_ports'] = dhcp_ports
return subnets
# REVISIT: Remove with original RPC implementation.
def _get_nova_vm_name(self, context, port):
return self.aim_mech_driver._get_vm_name(context.session,
port['device_id'])
# REVISIT: Remove with original RPC implementation.
def _send_port_update_notification(self, plugin_context, port):
self.aim_mech_driver._notify_port_update(plugin_context, port)
# REVISIT: Remove with original RPC implementation.
def _get_aap_details(self, plugin_context, port):
aaps = port['allowed_address_pairs']
# Set the correct address ownership for this port
owned_addresses = self._get_owned_addresses(
plugin_context, port['id'])
extra_aaps = []
for allowed in aaps:
cidr = netaddr.IPNetwork(allowed['ip_address'])
if ((cidr.version == 4 and cidr.prefixlen != 32) or
(cidr.version == 6 and cidr.prefixlen != 128)):
# Never mark CIDRs as "active", but
# look for owned addresses in this CIDR, and
# if present, add them to the allowed-address-pairs
# list, and mark those as "active"
for addr in owned_addresses:
entry = {'ip_address': addr,
'mac_address': allowed['mac_address'],
'active': True}
if addr in cidr and entry not in extra_aaps:
extra_aaps.append(entry)
elif allowed['ip_address'] in owned_addresses:
# Signal the agent that this particular address is active
# on its port
allowed['active'] = True
if extra_aaps:
aaps.extend(extra_aaps)
return aaps
# REVISIT: Remove with original RPC implementation.
def _get_port_vrf(self, plugin_context, port):
net_db = self._core_plugin._get_network(plugin_context,
port['network_id'])
return self.aim_mech_driver.get_vrf_for_network(
plugin_context.session, net_db)
# REVISIT: Remove with original RPC implementation.
def _get_vrf_subnets(self, plugin_context, vrf_tenant_name, vrf_name):
session = plugin_context.session
result = []
# get all subnets of the specified VRF
with session.begin(subtransactions=True):
# Find VRF's address_scope first
address_scope_ids = (
self.aim_mech_driver._get_address_scope_ids_for_vrf(
session,
aim_resource.VRF(tenant_name=vrf_tenant_name,
name=vrf_name)))
if address_scope_ids:
for address_scope_id in address_scope_ids:
subnetpools = self._get_subnetpools(
plugin_context,
filters={'address_scope_id': [address_scope_id]})
for pool in subnetpools:
result.extend(pool['prefixes'])
else:
aim_ctx = aim_context.AimContext(db_session=session)
if vrf_tenant_name != md.COMMON_TENANT_NAME:
bds = self.aim.find(aim_ctx, aim_resource.BridgeDomain,
tenant_name=vrf_tenant_name,
vrf_name=vrf_name)
else:
bds = self.aim.find(aim_ctx, aim_resource.BridgeDomain,
vrf_name=vrf_name)
other_vrfs = self.aim.find(aim_ctx, aim_resource.VRF,
name=vrf_name)
bd_tenants = set([x.tenant_name for x in bds])
vrf_tenants = set([x.tenant_name for x in other_vrfs
if x.tenant_name != vrf_tenant_name])
valid_tenants = bd_tenants - vrf_tenants
# Only keep BDs that don't have a VRF with that name
# already
bds = [x for x in bds if x.tenant_name in valid_tenants]
# Retrieve subnets from BDs
net_ids = self._get_net_ids_from_bds(session, bds)
if net_ids:
subnets = self._get_subnets(plugin_context,
{'network_id': net_ids})
result = [x['cidr'] for x in subnets]
return result
# REVISIT: Remove with original RPC implementation.
def _get_net_ids_from_bds(self, session, bds):
net_ids = []
for bd in bds:
try:
net_ids.append(self.name_mapper.reverse_network(
session, bd.name))
except md_exc.InternalError:
# Check if BD maps to an external network
ext_ids = self.aim_mech_driver.get_network_ids_for_bd(
session, bd)
net_ids.extend(ext_ids)
# If no external network is found, we ignore reverse
# mapping failures because there may be APIC BDs in the
# concerned VRF that Neutron is unaware of. This is
# especially true for VRFs in the common tenant.
return net_ids
# REVISIT: Remove with original RPC implementation.
def _get_segmentation_labels(self, plugin_context, port):
pt = self._port_id_to_pt(plugin_context, port['id'])
if self.apic_segmentation_label_driver and pt and (
'segmentation_labels' in pt):
return pt['segmentation_labels']
# REVISIT: Remove with original RPC implementation.
def _get_nat_details(self, plugin_context, port, host, details):
""" Add information about IP mapping for DNAT/SNAT """
fips = []
ipms = []
host_snat_ips = []
# Find all external networks connected to the port.
# Handle them depending on whether there is a FIP on that
# network.
ext_nets = []
port_sn = set([x['subnet_id'] for x in port['fixed_ips']])
router_intf_ports = self._get_ports(
plugin_context,
filters={'device_owner': [n_constants.DEVICE_OWNER_ROUTER_INTF],
'fixed_ips': {'subnet_id': port_sn}})
if router_intf_ports:
routers = self._get_routers(
plugin_context,
filters={'id': [x['device_id']
for x in router_intf_ports]})
ext_nets = self._get_networks(
plugin_context,
filters={'id': [r['external_gateway_info']['network_id']
for r in routers
if r.get('external_gateway_info')]})
if not ext_nets:
return fips, ipms, host_snat_ips
# Handle FIPs of owned addresses - find other ports in the
# network whose address is owned by this port.
# If those ports have FIPs, then steal them.
fips_filter = [port['id']]
active_addrs = [a['ip_address']
for a in details['allowed_address_pairs']
if a.get('active')]
if active_addrs:
others = self._get_ports(
plugin_context,
filters={'network_id': [port['network_id']],
'fixed_ips': {'ip_address': active_addrs}})
fips_filter.extend([p['id'] for p in others
if p['id'] != port['id']])
fips = self._get_fips(plugin_context,
filters={'port_id': fips_filter})
for ext_net in ext_nets:
dn = ext_net.get(cisco_apic.DIST_NAMES, {}).get(
cisco_apic.EXTERNAL_NETWORK)
ext_net_epg_dn = ext_net.get(cisco_apic.DIST_NAMES, {}).get(
cisco_apic.EPG)
if not dn or not ext_net_epg_dn:
continue
if 'distributed' != ext_net.get(cisco_apic.NAT_TYPE):
continue
# TODO(amitbose) Handle per-tenant NAT EPG
ext_net_epg = aim_resource.EndpointGroup.from_dn(ext_net_epg_dn)
fips_in_ext_net = [fip for fip in fips
if fip['floating_network_id'] == ext_net['id']]
if not fips_in_ext_net:
ext_segment_name = dn.replace('/', ':')
ipms.append({'external_segment_name': ext_segment_name,
'nat_epg_name': ext_net_epg.name,
'nat_epg_app_profile': (
ext_net_epg.app_profile_name),
'nat_epg_tenant': ext_net_epg.tenant_name})
# TODO(amitbose) Set next_hop_ep_tenant for per-tenant NAT EPG
if host:
snat_ip = self.aim_mech_driver.get_or_allocate_snat_ip(
plugin_context, host, ext_net)
if snat_ip:
snat_ip['external_segment_name'] = ext_segment_name
host_snat_ips.append(snat_ip)
else:
for f in fips_in_ext_net:
f['nat_epg_name'] = ext_net_epg.name
f['nat_epg_app_profile'] = ext_net_epg.app_profile_name
f['nat_epg_tenant'] = ext_net_epg.tenant_name
return fips, ipms, host_snat_ips
def _get_vrf_by_dn(self, context, vrf_dn):
aim_context = self._get_aim_context(context)
vrf = self.aim.get(
@ -2346,13 +2019,6 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
else:
return False
# REVISIT: Remove with original RPC implementation.
def _get_bd_by_dn(self, context, bd_dn):
aim_context = self._get_aim_context(context)
bd = self.aim.get(
aim_context, aim_resource.BridgeDomain.from_dn(bd_dn))
return bd
def _get_epg_by_dn(self, context, epg_dn):
aim_context = self._get_aim_context(context)
epg = self.aim.get(
@ -2455,38 +2121,6 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin):
query(context._plugin_context.session).params(
pr_ids=pr_ids).all())]
# REVISIT: Remove with original RPC implementation.
def _get_port_mtu(self, context, port):
if self.advertise_mtu:
for dhcp_opt in port.get('extra_dhcp_opts'):
if (dhcp_opt.get('opt_name') == 'interface-mtu' or
dhcp_opt.get('opt_name') == '26'):
if dhcp_opt.get('opt_value'):
try:
return int(dhcp_opt['opt_value'])
except ValueError:
continue
network = self._get_network(context, port['network_id'])
return network.get('mtu')
return None
# REVISIT: Remove with original RPC implementation.
def _get_dns_domain(self, context, port):
network = self._get_network(context, port['network_id'])
return network.get('dns_domain')
# REVISIT: Remove with original RPC implementation.
def _get_nested_domain(self, context, port):
network = self._get_network(context, port['network_id'])
return (network.get('apic:nested_domain_name'),
network.get('apic:nested_domain_type'),
network.get('apic:nested_domain_infra_vlan'),
network.get('apic:nested_domain_service_vlan'),
network.get('apic:nested_domain_node_network_vlan'),
network.get('apic:nested_domain_allowed_vlans'),
self._nested_host_vlan if network.get(
'apic:nested_domain_infra_vlan') else None)
def _create_per_l3p_implicit_contracts(self):
admin_context = n_context.get_admin_context()
context = type('', (object,), {})()

View File

@ -14,16 +14,6 @@ from collections import namedtuple
import sqlalchemy as sa
from sqlalchemy.ext import baked
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.db.models import securitygroup as sg_models
from neutron.objects import base as objects_base
from neutron.objects import trunk as trunk_objects
from neutron.plugins.ml2 import rpc as ml2_rpc
from neutron_lib.api.definitions import portbindings
from opflexagent import rpc as o_rpc
from oslo_log import log
from gbpservice.neutron.db.grouppolicy.extensions import (
@ -51,348 +41,6 @@ EndpointPtInfo = namedtuple(
class AIMMappingRPCMixin(object):
"""RPC mixin for AIM mapping.
Collection of all the RPC methods consumed by the AIM mapping.
By defining the mixin requirements, we can potentially move the RPC
handling between GBP and Neutron preserving the same code base. Such
requirements might be easier to implement in some places (eg. won't
require model extensions) compared to others, based on the visibility
that each module has over the network abstraction.
"""
# REVISIT: Remove original RPC implementation - everything in this
# class before the query_endpoint_rpc_info method.
def setup_opflex_rpc_listeners(self):
self.notifier = o_rpc.AgentNotifierApi(topics.AGENT)
LOG.debug("Set up Opflex RPC listeners.")
self.opflex_endpoints = [
o_rpc.GBPServerRpcCallback(self, self.notifier)]
self.opflex_topic = o_rpc.TOPIC_OPFLEX
self.opflex_conn = n_rpc.create_connection()
self.opflex_conn.create_consumer(
self.opflex_topic, self.opflex_endpoints, fanout=False)
return self.opflex_conn.consume_in_threads()
@db_api.retry_if_session_inactive()
def _retrieve_vrf_details(self, context, **kwargs):
with context.session.begin(subtransactions=True):
details = {'l3_policy_id': kwargs['vrf_id']}
self._add_vrf_details(context, details['l3_policy_id'], details)
return details
def _get_vrf_details(self, context, **kwargs):
LOG.debug("APIC AIM handling _get_vrf_details for: %s", kwargs)
try:
return self._retrieve_vrf_details(context, **kwargs)
except Exception as e:
vrf = kwargs.get('vrf_id')
LOG.error("An exception has occurred while retrieving vrf "
"gbp details for %s", vrf)
LOG.exception(e)
return {'l3_policy_id': vrf}
def get_vrf_details(self, context, **kwargs):
return self._get_vrf_details(context, **kwargs)
def request_vrf_details(self, context, **kwargs):
return self._get_vrf_details(context, **kwargs)
def get_gbp_details(self, context, **kwargs):
LOG.debug("APIC AIM handling get_gbp_details for: %s", kwargs)
try:
return self._get_gbp_details(context, kwargs, kwargs.get('host'))
except Exception as e:
device = kwargs.get('device')
LOG.error("An exception has occurred while retrieving device "
"gbp details for %s", device)
LOG.exception(e)
return {'device': device}
def request_endpoint_details(self, context, **kwargs):
LOG.debug("APIC AIM handling get_endpoint_details for: %s", kwargs)
request = kwargs.get('request')
try:
return self._request_endpoint_details(context, **kwargs)
except Exception as e:
LOG.error("An exception has occurred while requesting device "
"gbp details for %s", request.get('device'))
LOG.exception(e)
return None
@db_api.retry_if_session_inactive()
def _request_endpoint_details(self, context, **kwargs):
request = kwargs.get('request')
host = kwargs.get('host')
result = {'device': request['device'],
'timestamp': request['timestamp'],
'request_id': request['request_id'],
'gbp_details': self._get_gbp_details(context, request,
host),
'neutron_details': ml2_rpc.RpcCallbacks(
None, None).get_device_details(context, **request),
'trunk_details': self._get_trunk_details(context,
request, host)}
return result
# Child class needs to support:
# - self._send_port_update_notification(context, port)
def ip_address_owner_update(self, context, **kwargs):
if not kwargs.get('ip_owner_info'):
return
ports_to_update = self.aim_mech_driver.update_ip_owner(
kwargs['ip_owner_info'])
for p in ports_to_update:
LOG.debug("APIC ownership update for port %s", p)
self._send_port_update_notification(context, p)
def _get_trunk_details(self, context, request, host):
if self._trunk_plugin:
device = request.get('device')
port_id = self._core_plugin._device_to_port_id(context, device)
# Find Trunk associated to this port (if any)
trunks = self._trunk_plugin.get_trunks(
context, filters={'port_id': [port_id]})
subports = None
if not trunks:
subports = self.retrieve_subports(
context, filters={'port_id': [port_id]})
if subports:
trunks = self._trunk_plugin.get_trunks(
context, filters={'id': [subports[0].trunk_id]})
if trunks:
return {'trunk_id': trunks[0]['id'],
'master_port_id': trunks[0]['port_id'],
'subports': (
[s.to_dict() for s in subports] if subports else
self._trunk_plugin.get_subports(
context, trunks[0]['id'])['sub_ports'])}
# NOTE(ivar): for some reason, the Trunk plugin doesn't expose a way to
# retrieve a subport starting from the port ID.
@db_base_plugin_common.filter_fields
def retrieve_subports(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
filters = filters or {}
pager = objects_base.Pager(sorts=sorts, limit=limit,
page_reverse=page_reverse,
marker=marker)
return trunk_objects.SubPort.get_objects(context, _pager=pager,
**filters)
# Things you need in order to run this Mixin:
# - self._core_plugin: attribute that points to the Neutron core plugin;
# - self._is_port_promiscuous(context, port): define whether or not
# a port should be put in promiscuous mode;
# - self._get_port_epg(context, port): returns the AIM EPG for the specific
# port
# for both Neutron and GBP.
# - self._is_dhcp_optimized(context, port);
# - self._is_metadata_optimized(context, port);
# - self._set_dhcp_lease_time(details)
# - self._get_dns_domain(context, port)
@db_api.retry_if_session_inactive()
def _get_gbp_details(self, context, request, host):
with context.session.begin(subtransactions=True):
device = request.get('device')
core_plugin = self._core_plugin
port_id = core_plugin._device_to_port_id(context, device)
port_context = core_plugin.get_bound_port_context(context, port_id,
host)
if not port_context:
LOG.warning("Device %(device)s requested by agent "
"%(agent_id)s not found in database",
{'device': port_id,
'agent_id': request.get('agent_id')})
return {'device': request.get('device')}
port = port_context.current
# NOTE(ivar): removed the PROXY_PORT_PREFIX hack.
# This was needed to support network services without hotplug.
epg = self._get_port_epg(context, port)
details = {'device': request.get('device'),
'enable_dhcp_optimization': self._is_dhcp_optimized(
context, port),
'enable_metadata_optimization': (
self._is_metadata_optimized(context, port)),
'port_id': port_id,
'mac_address': port['mac_address'],
'app_profile_name': epg.app_profile_name,
'tenant_id': port['tenant_id'],
'host': port[portbindings.HOST_ID],
# TODO(ivar): scope names, possibly through AIM or the
# name mapper
'ptg_tenant': epg.tenant_name,
'endpoint_group_name': epg.name,
'promiscuous_mode': self._is_port_promiscuous(context,
port),
'extra_ips': [],
'floating_ip': [],
'ip_mapping': [],
# Put per mac-address extra info
'extra_details': {}}
self._set_nova_vm_name(context, port, details)
mtu = self._get_port_mtu(context, port)
if mtu:
details['interface_mtu'] = mtu
details['dns_domain'] = self._get_dns_domain(context, port)
# NOTE(ivar): having these methods cleanly separated actually makes
# things less efficient by requiring lots of calls duplication.
# we could alleviate this by passing down a cache that stores
# commonly requested objects (like EPGs). 'details' itself could
# be used for such caching.
if port.get('security_groups'):
self._add_security_group_details(context, port, details)
vrf = self._get_port_vrf(context, port)
details['l3_policy_id'] = '%s %s' % (vrf.tenant_name, vrf.name)
self._add_subnet_details(context, port, details)
self._add_allowed_address_pairs_details(context, port, details)
self._add_vrf_details(context, details['l3_policy_id'], details)
self._add_nat_details(context, port, host, details)
self._add_extra_details(context, port, details)
self._add_segmentation_label_details(context, port, details)
self._set_dhcp_lease_time(details)
self._add_nested_domain_details(context, port, details)
LOG.debug("Details for port %s : %s", port['id'], details)
return details
def _set_nova_vm_name(self, context, port, details):
# Set VM name if needed.
if port['device_owner'].startswith(
'compute:') and port['device_id']:
vm = self._get_nova_vm_name(context, port)
if vm:
vm_name, = vm
else:
vm_name = port['device_id']
details['vm-name'] = vm_name
def _get_owned_addresses(self, plugin_context, port_id):
return set(self.aim_mech_driver.get_ha_ipaddresses_for_port(port_id))
def _add_security_group_details(self, context, port, details):
vif_details = port.get('binding:vif_details')
# For legacy VMs, they are running in this mode which means
# they will use iptables to support SG. Then we don't bother
# to configure any SG for them here.
if (vif_details and vif_details.get('port_filter') and
vif_details.get('ovs_hybrid_plug')):
return
details['security_group'] = []
if port['security_groups']:
query = BAKERY(lambda s: s.query(
sg_models.SecurityGroup.id,
sg_models.SecurityGroup.tenant_id))
query += lambda q: q.filter(
sg_models.SecurityGroup.id.in_(
sa.bindparam('sg_ids', expanding=True)))
port_sgs = query(context.session).params(
sg_ids=port['security_groups']).all()
for sg_id, tenant_id in port_sgs:
tenant_aname = self.aim_mech_driver.name_mapper.project(
context.session, tenant_id)
details['security_group'].append(
{'policy-space': tenant_aname,
'name': sg_id})
# Always include this SG which has the default arp & dhcp rules
details['security_group'].append(
{'policy-space': 'common',
'name': self.aim_mech_driver._default_sg_name})
# Child class needs to support:
# - self._get_subnet_details(context, port)
def _add_subnet_details(self, context, port, details):
# This method needs to define requirements for this Mixin's child
# classes in order to fill the following result parameters:
# - subnets;
details['subnets'] = self._get_subnet_details(context, port)
def _add_nat_details(self, context, port, host, details):
# This method needs to define requirements for this Mixin's child
# classes in order to fill the following result parameters:
# - floating_ip;
# - ip_mapping;
# - host_snat_ips.
(details['floating_ip'], details['ip_mapping'],
details['host_snat_ips']) = self._get_nat_details(
context, port, host, details)
# Child class needs to support:
# - self._get_aap_details(context, port)
def _add_allowed_address_pairs_details(self, context, port, details):
# This method needs to define requirements for this Mixin's child
# classes in order to fill the following result parameters:
# - allowed_address_pairs
# This should take care of realizing whether a given address is
# active in the specific port
details['allowed_address_pairs'] = self._get_aap_details(context, port)
# Child class needs to support:
# - self._get_vrf_subnets(context, vrf_tenant_name, vrf_name):
# Subnets managed by the specific VRF.
def _add_vrf_details(self, context, vrf_id, details):
# This method needs to define requirements for this Mixin's child
# classes in order to fill the following result parameters:
# - l3_policy_id;
# - vrf_tenant;
# - vrf_name;
# - vrf_subnets.
tenant_name, name = vrf_id.split(' ')
details['vrf_tenant'] = tenant_name
details['vrf_name'] = name
details['vrf_subnets'] = self._get_vrf_subnets(context, tenant_name,
name)
# Child class needs to support:
# - self._get_nested_domain(context, port)
def _add_nested_domain_details(self, context, port, details):
# This method needs to define requirements for this Mixin's child
# classes in order to fill the following result parameters:
# - nested_domain_name;
# - nested_domain_type;
# - nested_domain_infra_vlan;
# - nested_domain_service_vlan;
# - nested_domain_node_network_vlan;
# - nested_domain_allowed_vlans;
(details['nested_domain_name'], details['nested_domain_type'],
details['nested_domain_infra_vlan'],
details['nested_domain_service_vlan'],
details['nested_domain_node_network_vlan'],
details['nested_domain_allowed_vlans'],
details['nested_host_vlan']) = (
self._get_nested_domain(context, port))
# Child class needs to support:
# - self._get_segmentation_labels(context, port)
def _add_segmentation_label_details(self, context, port, details):
# This method needs to define requirements for this Mixin's child
# classes in order to fill the following result parameters:
# - segmentation_labels
# apic_segmentation_label is a GBP driver extension configured
# for the aim_mapping driver
details['segmentation_labels'] = self._get_segmentation_labels(
context, port)
def _add_extra_details(self, context, port, details):
# TODO(ivar): Extra details depend on HA and SC implementation
# This method needs to define requirements for this Mixin's child
# classes in order to fill per-mac address extra information.
# What is an "End of the Chain" port for Neutron?
pass
# The query_endpoint_rpc_info and update_endpoint_rpc_details
# methods below are called by the apic_aim mechanism driver while
# handling the request_endpoint_details (aka get_gbp_details) RPC

View File

@ -477,7 +477,7 @@ class TestRpcListeners(ApicAimTestCase):
return self.servers
# REVISIT: Remove new_rpc option with old RPC cleanup.
def _test_start_rpc_listeners(self, new_rpc):
def test_start_rpc_listeners(self):
# Override mock from
# neutron.tests.base.BaseTestCase.setup_rpc_mocks(), so that
# it returns servers, but still avoids starting them.
@ -488,17 +488,7 @@ class TestRpcListeners(ApicAimTestCase):
servers = self.plugin.start_rpc_listeners()
topics = [server._target.topic for server in servers]
self.assertIn('apic-service', topics)
if new_rpc:
self.assertIn('opflex', topics)
else:
self.assertNotIn('opflex', topics)
def test_start_rpc_listeners(self):
self._test_start_rpc_listeners(True)
def test_start_rpc_listeners_old_rpc(self):
self.driver.enable_new_rpc = False
self._test_start_rpc_listeners(False)
self.assertIn('opflex', topics)
def test_opflex_endpoint(self):
self.plugin.start_rpc_listeners()

View File

@ -283,13 +283,6 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
self._mech_driver = self.driver.aim_mech_driver
return self._mech_driver
# REVISIT: As part of RPC cleanup, eliminate this property and
# globally replace self.rpc_handler with self.mech_driver.
@property
def rpc_handler(self):
return (self.mech_driver if self.mech_driver.enable_new_rpc
else self.driver)
@property
def dummy(self):
# dummy policy driver reference
@ -2065,7 +2058,7 @@ class TestL2PolicyWithAutoPTG(TestL2PolicyBase):
with self.port(subnet=subnet) as port:
port_id = port['port']['id']
self._bind_port_to_host(port_id, 'h1')
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % port_id,
host='h1')
self.assertEqual(
@ -2100,7 +2093,7 @@ class TestL2PolicyWithAutoPTG(TestL2PolicyBase):
policy_target_group_id=ptg['id'])['policy_target']
self._bind_port_to_host(pt['port_id'], 'h1')
port = self._plugin.get_port(self._context, pt['port_id'])
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % pt['port_id'],
host='h1')
ap_name = self.driver.apic_ap_name_for_application_policy_group(
@ -2111,7 +2104,7 @@ class TestL2PolicyWithAutoPTG(TestL2PolicyBase):
ptg = self.update_policy_target_group(
ptg['id'], expected_res_status=200,
application_policy_group_id=new_apg['id'])['policy_target_group']
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % pt['port_id'],
host='h1')
ap_name = self.driver.apic_ap_name_for_application_policy_group(
@ -2642,10 +2635,8 @@ class TestPolicyTargetGroupRollback(AIMBaseTestCase):
class TestGbpDetailsForML2(AIMBaseTestCase,
test_securitygroup.SecurityGroupsTestCase):
# REVISIT: Once the new RPC handler implementation in the apic_aim
# mechanism driver is complete and tested, move this unit test
# class to test_apic_aim (or a new module) and remove the
# enable_new_rpc flag.
# REVISIT: Move this unit test class to the MD tests
# (test_apic_aim or a new module)?
def setUp(self, *args, **kwargs):
super(TestGbpDetailsForML2, self).setUp(*args, **kwargs)
@ -2739,9 +2730,7 @@ class TestGbpDetailsForML2(AIMBaseTestCase,
'prefixlen': int(prefix)},
mapping['host_snat_ips'][0])
def _do_test_get_gbp_details(self, pre_vrf=None,
enable_new_rpc=True):
self.mech_driver.enable_new_rpc = enable_new_rpc
def _do_test_get_gbp_details(self, pre_vrf=None):
self.mech_driver.apic_optimized_dhcp_lease_time = 100
ext_net1, rtr1, ext_net1_sub = self._setup_external_network(
'es1', dn='uni/tn-t1/out-l1/instP-n1')
@ -2801,10 +2790,10 @@ class TestGbpDetailsForML2(AIMBaseTestCase,
fip = self._make_floatingip(self.fmt, ext_net1_sub['network_id'],
port_id=p1['id'])['floatingip']
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='h1')
req_mapping = self.rpc_handler.request_endpoint_details(
req_mapping = self.mech_driver.request_endpoint_details(
nctx.get_admin_context(),
request={'device': 'tap%s' % p1['id'],
'timestamp': 0, 'request_id': 'request_id'},
@ -2859,10 +2848,10 @@ class TestGbpDetailsForML2(AIMBaseTestCase,
'opt_value': 'garbage'}]}}
port = self._update('ports', port['id'], data)['port']
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p2['id'],
host='h2')
req_mapping = self.rpc_handler.request_endpoint_details(
req_mapping = self.mech_driver.request_endpoint_details(
nctx.get_admin_context(),
request={'device': 'tap%s' % p2['id'],
'timestamp': 0, 'request_id': 'request_id'},
@ -2908,7 +2897,7 @@ class TestGbpDetailsForML2(AIMBaseTestCase,
data = {'port': {'extra_dhcp_opts': [{'opt_name': 'interface-mtu',
'opt_value': '2000'}]}}
port = self._update('ports', port['id'], data)['port']
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p2['id'],
host='h2')
self.assertEqual(2000, mapping['interface_mtu'])
@ -2916,9 +2905,6 @@ class TestGbpDetailsForML2(AIMBaseTestCase,
def test_get_gbp_details(self):
self._do_test_get_gbp_details()
def test_get_gbp_details_with_old_rpc(self):
self._do_test_get_gbp_details(enable_new_rpc=False)
def test_get_gbp_details_pre_existing_vrf(self):
aim_ctx = aim_context.AimContext(self.db_session)
vrf = self.aim_mgr.create(
@ -2926,14 +2912,6 @@ class TestGbpDetailsForML2(AIMBaseTestCase,
monitored=True))
self._do_test_get_gbp_details(pre_vrf=vrf)
def test_get_gbp_details_pre_existing_vrf_with_old_rpc(self):
aim_ctx = aim_context.AimContext(self.db_session)
vrf = self.aim_mgr.create(
aim_ctx, aim_resource.VRF(tenant_name='common', name='ctx1',
monitored=True))
self._do_test_get_gbp_details(pre_vrf=vrf,
enable_new_rpc=False)
class TestPolicyTarget(AIMBaseTestCase,
test_securitygroup.SecurityGroupsTestCase):
@ -3371,8 +3349,7 @@ class TestPolicyTarget(AIMBaseTestCase,
'prefixlen': int(prefix)},
mapping['host_snat_ips'][0])
def _do_test_get_gbp_details(self, pre_vrf=None, enable_new_rpc=True):
self.mech_driver.enable_new_rpc = enable_new_rpc
def _do_test_get_gbp_details(self, pre_vrf=None):
self.mech_driver.apic_optimized_dhcp_lease_time = 100
es1, es1_sub = self._setup_external_segment(
'es1', dn='uni/tn-t1/out-l1/instP-n1')
@ -3404,13 +3381,13 @@ class TestPolicyTarget(AIMBaseTestCase,
fip = self._make_floatingip(self.fmt, es1_sub['network_id'],
port_id=pt1['port_id'])['floatingip']
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % pt1['port_id'],
host='h1')
if 'apic_segmentation_label' in self._extension_drivers:
self.assertItemsEqual(segmentation_labels,
mapping['segmentation_labels'])
req_mapping = self.rpc_handler.request_endpoint_details(
req_mapping = self.mech_driver.request_endpoint_details(
nctx.get_admin_context(),
request={'device': 'tap%s' % pt1['port_id'],
'timestamp': 0, 'request_id': 'request_id'},
@ -3463,7 +3440,7 @@ class TestPolicyTarget(AIMBaseTestCase,
data = {'port': {'extra_dhcp_opts': [{'opt_name': '26',
'opt_value': 'garbage'}]}}
port = self._update('ports', port['id'], data)['port']
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % pt2['port_id'],
host='h1')
self.assertEqual(pt2['port_id'], mapping['port_id'])
@ -3495,14 +3472,13 @@ class TestPolicyTarget(AIMBaseTestCase,
data = {'port': {'extra_dhcp_opts': [{'opt_name': 'interface-mtu',
'opt_value': '2000'}]}}
port = self._update('ports', port['id'], data)['port']
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % pt2['port_id'],
host='h1')
self.assertEqual(2000, mapping['interface_mtu'])
def _do_test_gbp_details_no_pt(self, use_as=True, routed=True,
pre_vrf=None, enable_new_rpc=True):
self.mech_driver.enable_new_rpc = enable_new_rpc
pre_vrf=None):
# Create port and bind it
address_scope = self._make_address_scope_for_vrf(
pre_vrf.dn if pre_vrf else None,
@ -3556,11 +3532,11 @@ class TestPolicyTarget(AIMBaseTestCase,
'port_id': port_id}})
self._bind_port_to_host(port_id, 'h1')
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % port_id,
host='h1')
self.assertEqual('mydomain.', mapping['dns_domain'])
req_mapping = self.rpc_handler.request_endpoint_details(
req_mapping = self.mech_driver.request_endpoint_details(
nctx.get_admin_context(),
request={'device': 'tap%s' % port_id,
'timestamp': 0, 'request_id': 'request_id'},
@ -3579,7 +3555,7 @@ class TestPolicyTarget(AIMBaseTestCase,
vrf_tenant = self.name_mapper.project(None,
self._tenant_id)
vrf_id = '%s %s' % (vrf_tenant, vrf_name)
vrf_mapping = self.rpc_handler.get_vrf_details(
vrf_mapping = self.mech_driver.get_vrf_details(
self._neutron_admin_context, vrf_id=vrf_id)
epg_name = self.name_mapper.network(
@ -3612,7 +3588,7 @@ class TestPolicyTarget(AIMBaseTestCase,
data = {'port': {'extra_dhcp_opts': [{'opt_name': '26',
'opt_value': '2100'}]}}
port = self._update('ports', port_id, data)['port']
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % port_id,
host='h1')
self.assertEqual(2100, mapping['interface_mtu'])
@ -3620,9 +3596,6 @@ class TestPolicyTarget(AIMBaseTestCase,
def test_get_gbp_details(self):
self._do_test_get_gbp_details()
def test_get_gbp_details_with_old_rpc(self):
self._do_test_get_gbp_details(enable_new_rpc=False)
def test_get_gbp_details_pre_existing_vrf(self):
aim_ctx = aim_context.AimContext(self.db_session)
vrf = self.aim_mgr.create(
@ -3630,23 +3603,11 @@ class TestPolicyTarget(AIMBaseTestCase,
monitored=True))
self._do_test_get_gbp_details(pre_vrf=vrf)
def test_get_gbp_details_pre_existing_vrf_with_old_rpc(self):
aim_ctx = aim_context.AimContext(self.db_session)
vrf = self.aim_mgr.create(
aim_ctx, aim_resource.VRF(tenant_name='common', name='ctx1',
monitored=True))
self._do_test_get_gbp_details(pre_vrf=vrf, enable_new_rpc=False)
def test_get_gbp_details_no_pt(self):
# Test that traditional Neutron ports behave correctly from the
# RPC perspective
self._do_test_gbp_details_no_pt()
def test_get_gbp_details_no_pt_with_old_rpc(self):
# Test that traditional Neutron ports behave correctly from the
# RPC perspective
self._do_test_gbp_details_no_pt(enable_new_rpc=False)
def test_get_gbp_details_no_pt_pre_existing_vrf(self):
aim_ctx = aim_context.AimContext(self.db_session)
vrf = self.aim_mgr.create(
@ -3654,28 +3615,13 @@ class TestPolicyTarget(AIMBaseTestCase,
monitored=True))
self._do_test_gbp_details_no_pt(pre_vrf=vrf)
def test_get_gbp_details_no_pt_pre_existing_vrf_with_old_rpc(self):
aim_ctx = aim_context.AimContext(self.db_session)
vrf = self.aim_mgr.create(
aim_ctx, aim_resource.VRF(tenant_name='common', name='ctx1',
monitored=True))
self._do_test_gbp_details_no_pt(pre_vrf=vrf, enable_new_rpc=False)
def test_get_gbp_details_no_pt_no_as(self):
self._do_test_gbp_details_no_pt(use_as=False)
def test_get_gbp_details_no_pt_no_as_with_old_rpc(self):
self._do_test_gbp_details_no_pt(use_as=False, enable_new_rpc=False)
def test_get_gbp_details_no_pt_no_as_unrouted(self):
self._do_test_gbp_details_no_pt(use_as=False, routed=False)
def test_get_gbp_details_no_pt_no_as_unrouted_with_old_rpc(self):
self._do_test_gbp_details_no_pt(use_as=False, routed=False,
enable_new_rpc=False)
def _test_gbp_details_ext_net_no_pt(self, enable_new_rpc=True):
self.mech_driver.enable_new_rpc = enable_new_rpc
def test_gbp_details_ext_net_no_pt(self):
# Test ports created on Neutron external networks
ext_net1, _, sn1 = self._setup_external_network(
'l1', dn='uni/tn-common/out-l1/instP-n1')
@ -3704,10 +3650,10 @@ class TestPolicyTarget(AIMBaseTestCase,
sn1_1 = self._make_subnet(self.fmt, {'network': ext_net1},
'200.200.0.1', '200.200.0.0/16')
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % port_id,
host='h1')
req_mapping = self.rpc_handler.request_endpoint_details(
req_mapping = self.mech_driver.request_endpoint_details(
self._neutron_admin_context,
request={'device': 'tap%s' % port_id,
'timestamp': 0, 'request_id': 'request_id'},
@ -3717,7 +3663,7 @@ class TestPolicyTarget(AIMBaseTestCase,
map_tenant_name=False, prefix_ap_name=True)
vrf_id = '%s %s' % ("common", "openstack_EXT-l1")
vrf_mapping = self.rpc_handler.get_vrf_details(
vrf_mapping = self.mech_driver.get_vrf_details(
self._neutron_admin_context, vrf_id=vrf_id)
supernet = [sn1['subnet']['cidr'], sn1_1['subnet']['cidr']]
@ -3732,10 +3678,10 @@ class TestPolicyTarget(AIMBaseTestCase,
sn2_1 = self._make_subnet(self.fmt, {'network': ext_net2},
'250.250.0.1', '250.250.0.0/16')
mapping = self.rpc_handler.get_gbp_details(
mapping = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % port_id,
host='h1')
req_mapping = self.rpc_handler.request_endpoint_details(
req_mapping = self.mech_driver.request_endpoint_details(
nctx.get_admin_context(),
request={'device': 'tap%s' % port_id,
'timestamp': 0, 'request_id': 'request_id'},
@ -3745,7 +3691,7 @@ class TestPolicyTarget(AIMBaseTestCase,
map_tenant_name=False)
vrf_id = '%s %s' % ("t1", "EXT-l2")
vrf_mapping = self.rpc_handler.get_vrf_details(
vrf_mapping = self.mech_driver.get_vrf_details(
self._neutron_admin_context, vrf_id=vrf_id)
supernet = [sn2['subnet']['cidr'], sn2_1['subnet']['cidr']]
@ -3754,12 +3700,6 @@ class TestPolicyTarget(AIMBaseTestCase,
self._verify_vrf_details_assertions(
vrf_mapping, "EXT-l2", vrf_id, supernet, "t1")
def test_gbp_details_ext_net_no_pt(self):
self._test_gbp_details_ext_net_no_pt()
def test_gbp_details_ext_net_no_pt_with_old_rpc(self):
self._test_gbp_details_ext_net_no_pt(enable_new_rpc=False)
def test_ip_address_owner_update(self):
l3p = self.create_l3_policy(name='myl3')['l3_policy']
l2p = self.create_l2_policy(name='myl2',
@ -3779,7 +3719,7 @@ class TestPolicyTarget(AIMBaseTestCase,
self.mech_driver._notify_port_update = mock.Mock()
# set new owner
self.rpc_handler.ip_address_owner_update(self._context,
self.mech_driver.ip_address_owner_update(self._context,
ip_owner_info=ip_owner_info, host='h1')
obj = self.mech_driver.get_port_for_ha_ipaddress(
'1.2.3.4', net_id)
@ -3791,7 +3731,7 @@ class TestPolicyTarget(AIMBaseTestCase,
# update existing owner
self.mech_driver._notify_port_update.reset_mock()
ip_owner_info['port'] = pt2['port_id']
self.rpc_handler.ip_address_owner_update(self._context,
self.mech_driver.ip_address_owner_update(self._context,
ip_owner_info=ip_owner_info, host='h2')
obj = self.mech_driver.get_port_for_ha_ipaddress(
'1.2.3.4', net_id)
@ -5668,7 +5608,7 @@ class TestNestedDomain(AIMBaseTestCase):
p1 = self._make_port(self.fmt, net['network']['id'],
device_owner='compute:')['port']
p1 = self._bind_port_to_host(p1['id'], 'host1')['port']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='host1')
self.assertEqual('myk8s', details['nested_domain_name'])
@ -5700,7 +5640,7 @@ class TestNestedDomain(AIMBaseTestCase):
p1 = self._make_port(self.fmt, net['network']['id'],
device_owner='compute:')['port']
p1 = self._bind_port_to_host(p1['id'], 'host1')['port']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='host1')
self.assertEqual('', details['nested_domain_name'])
@ -5712,13 +5652,6 @@ class TestNestedDomain(AIMBaseTestCase):
self.assertIsNone(details['nested_host_vlan'])
class TestNestedDomainWithOldRpc(TestNestedDomain):
def setUp(self, **kwargs):
super(TestNestedDomainWithOldRpc, self).setUp(**kwargs)
self.mech_driver.enable_new_rpc = False
class TestNeutronPortOperation(AIMBaseTestCase):
def setUp(self, **kwargs):
@ -5740,7 +5673,7 @@ class TestNeutronPortOperation(AIMBaseTestCase):
p1 = self._make_port(self.fmt, net['network']['id'],
device_owner='compute:')['port']
p1 = self._bind_port_to_host(p1['id'], 'host1')['port']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='host1')
self.assertFalse(details['promiscuous_mode'])
@ -5750,7 +5683,7 @@ class TestNeutronPortOperation(AIMBaseTestCase):
device_owner='compute:',
port_security_enabled=True)['port']
p2 = self._bind_port_to_host(p2['id'], 'host1')['port']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p2['id'],
host='host1')
self.assertFalse(details['promiscuous_mode'])
@ -5760,7 +5693,7 @@ class TestNeutronPortOperation(AIMBaseTestCase):
device_owner='compute:',
port_security_enabled=False)['port']
p3 = self._bind_port_to_host(p3['id'], 'host1')['port']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p3['id'],
host='host1')
self.assertTrue(details['promiscuous_mode'])
@ -5772,7 +5705,7 @@ class TestNeutronPortOperation(AIMBaseTestCase):
p1_dhcp = self._make_port(self.fmt, net['network']['id'],
device_owner=n_constants.DEVICE_OWNER_DHCP)['port']
p1_dhcp = self._bind_port_to_host(p1_dhcp['id'], 'host1')['port']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1_dhcp['id'],
host='host1')
self.assertTrue(details['promiscuous_mode'])
@ -5781,7 +5714,7 @@ class TestNeutronPortOperation(AIMBaseTestCase):
arg_list=('port_security_enabled',), port_security_enabled=True,
device_owner=n_constants.DEVICE_OWNER_DHCP)['port']
p2_dhcp = self._bind_port_to_host(p2_dhcp['id'], 'host1')['port']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p2_dhcp['id'],
host='host1')
self.assertTrue(details['promiscuous_mode'])
@ -5790,7 +5723,7 @@ class TestNeutronPortOperation(AIMBaseTestCase):
arg_list=('port_security_enabled',), port_security_enabled=False,
device_owner=n_constants.DEVICE_OWNER_DHCP)['port']
p3_dhcp = self._bind_port_to_host(p3_dhcp['id'], 'host1')['port']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p3_dhcp['id'],
host='host1')
self.assertTrue(details['promiscuous_mode'])
@ -5833,12 +5766,12 @@ class TestNeutronPortOperation(AIMBaseTestCase):
# Call agent => plugin RPC to get the details for each port. The
# results should only have the configured AAPs, with none of them
# active.
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='h1')
self.assertEqual(sorted(allow_addr),
sorted(details['allowed_address_pairs']))
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p2['id'],
host='h2')
self.assertEqual(sorted(allow_addr),
@ -5850,7 +5783,7 @@ class TestNeutronPortOperation(AIMBaseTestCase):
'network_id': p1['network_id']}
self.mech_driver.update_ip_owner(ip_owner_info)
# Call RPC sent by the agent to get the details for p1
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='h1')
@ -5883,7 +5816,7 @@ class TestNeutronPortOperation(AIMBaseTestCase):
'network_id': p2['network_id']}
self.mech_driver.update_ip_owner(ip_owner_info)
# Call RPC sent by the agent to get the details for p2
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p2['id'],
host='h2')
expected_aaps2 = _get_expected_aaps(allow_addr, owned_addr[1])
@ -5911,14 +5844,14 @@ class TestNeutronPortOperation(AIMBaseTestCase):
port_id=p3['id'])['floatingip']
fip2 = self._make_floatingip(self.fmt, net_ext['id'],
port_id=p4['id'])['floatingip']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='h1')
self.assertEqual(1, len(details['floating_ip']))
self.assertEqual(
fip1['floating_ip_address'],
details['floating_ip'][0]['floating_ip_address'])
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p2['id'],
host='h2')
self.assertEqual(1, len(details['floating_ip']))
@ -5959,7 +5892,7 @@ class TestNeutronPortOperation(AIMBaseTestCase):
'ip_address_v4': update_owned_addr[0],
'network_id': p1['network_id']}
self.mech_driver.update_ip_owner(ip_owner_info)
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='h1')
expected_aaps3 = _get_expected_aaps(update_addr, update_owned_addr[0])
@ -5976,7 +5909,7 @@ class TestNeutronPortOperation(AIMBaseTestCase):
'ip_address_v4': update_owned_addr[1],
'network_id': p2['network_id']}
self.mech_driver.update_ip_owner(ip_owner_info)
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p2['id'],
host='h2')
expected_aaps4 = _get_expected_aaps(update_addr, update_owned_addr[1])
@ -6027,45 +5960,38 @@ class TestNeutronPortOperation(AIMBaseTestCase):
# First test an unbound port
p1 = self._make_port(self.fmt, net['network']['id'],
device_owner='compute:')['port']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='h1')
self.assertEqual('', details.get('host', ''))
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='h2')
self.assertEqual('', details.get('host', ''))
# Test port bound to h2, queries from h1 and h2
p1 = self._bind_port_to_host(p1['id'], 'h2')['port']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='h1')
self.assertEqual('h2', details.get('host', 'h2'))
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='h2')
self.assertEqual('h2', details['host'])
# Test rebind of port to h1, queries from h1 and h2
p1 = self._bind_port_to_host(p1['id'], 'h1')['port']
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='h1')
self.assertEqual('h1', details['host'])
details = self.rpc_handler.get_gbp_details(
details = self.mech_driver.get_gbp_details(
self._neutron_admin_context, device='tap%s' % p1['id'],
host='h2')
self.assertEqual('h1', details.get('host', 'h1'))
class TestNeutronPortOperationWithOldRpc(TestNeutronPortOperation):
def setUp(self, **kwargs):
super(TestNeutronPortOperationWithOldRpc, self).setUp(**kwargs)
self.mech_driver.enable_new_rpc = False
class TestPerL3PImplicitContractsConfig(TestL2PolicyWithAutoPTG):
def setUp(self, **kwargs):
@ -6105,7 +6031,7 @@ class TestVlanAwareVM(AIMBaseTestCase):
'segmentation_type': 'vlan',
'segmentation_id': 100}]})
self._bind_port_to_host(port_id, 'h1')
req_mapping = self.rpc_handler.request_endpoint_details(
req_mapping = self.mech_driver.request_endpoint_details(
nctx.get_admin_context(),
request={'device': 'tap%s' % port_id,
'timestamp': 0, 'request_id': 'request_id'},
@ -6122,7 +6048,7 @@ class TestVlanAwareVM(AIMBaseTestCase):
'segmentation_id': 100}])
# Retrieve the subport
self._bind_port_to_host(subp_id, 'h1')
req_mapping = self.rpc_handler.request_endpoint_details(
req_mapping = self.mech_driver.request_endpoint_details(
nctx.get_admin_context(),
request={'device': 'tap%s' % subp_id,
'timestamp': 0, 'request_id': 'request_id'},
@ -6142,20 +6068,13 @@ class TestVlanAwareVM(AIMBaseTestCase):
self._do_test_gbp_details_no_pt()
class TestVlanAwareVMWithOldRpc(TestVlanAwareVM):
def setUp(self, **kwargs):
super(TestVlanAwareVMWithOldRpc, self).setUp(**kwargs)
self.mech_driver.enable_new_rpc = False
class TestL2PolicyRouteInjection(AIMBaseTestCase):
def _verify_rpc_response(self, port_id, inject, metadata):
# Invoke request_endpoint_details RPC handler.
request = {'device': 'tap%s' % port_id, 'timestamp': 0,
'request_id': 'a_request_id'}
response = self.rpc_handler.request_endpoint_details(
response = self.mech_driver.request_endpoint_details(
nctx.get_admin_context(), request=request, host='host1')
# Check subnet details.
@ -6204,10 +6123,3 @@ class TestL2PolicyRouteInjection(AIMBaseTestCase):
def test_route_injection_off(self):
self._test_route_injection(False)
class TestL2PolicyRouteInjectionWithOldRpc(TestL2PolicyRouteInjection):
def setUp(self, **kwargs):
super(TestL2PolicyRouteInjectionWithOldRpc, self).setUp(**kwargs)
self.mech_driver.enable_new_rpc = False