oflex 1.1 compliance

Closes-Bug: 1547694
Change-Id: Ie5a5869b566a03ed34a98e0e0ea06b246d672bb3
This commit is contained in:
Ivar Lazzaro
2016-02-18 15:07:59 -08:00
parent c9c581abdd
commit d282835e51
3 changed files with 208 additions and 141 deletions

View File

@@ -28,6 +28,7 @@ from neutron import context as nctx
from neutron.db import db_base_plugin_v2 as n_db
from neutron.extensions import portbindings
from neutron import manager
from neutron.plugins.ml2 import rpc as neu_rpc
from opflexagent import constants as ofcst
from opflexagent import rpc
from oslo_concurrency import lockutils
@@ -204,8 +205,8 @@ class ApicMappingDriver(api.ResourceMappingDriver,
def initialize(self):
super(ApicMappingDriver, self).initialize()
self._setup_rpc_listeners()
self._setup_rpc()
self._setup_rpc_listeners()
self.apic_manager = ApicMappingDriver.get_apic_manager()
self.name_mapper = name_manager.ApicNameManager(self.apic_manager)
self.enable_dhcp_opt = self.apic_manager.enable_optimized_dhcp
@@ -213,7 +214,7 @@ class ApicMappingDriver(api.ResourceMappingDriver,
self._gbp_plugin = None
def _setup_rpc_listeners(self):
self.endpoints = [rpc.GBPServerRpcCallback(self)]
self.endpoints = [rpc.GBPServerRpcCallback(self, self.notifier)]
self.topic = rpc.TOPIC_OPFLEX
self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer(self.topic, self.endpoints,
@@ -256,144 +257,173 @@ class ApicMappingDriver(api.ResourceMappingDriver,
self._add_vrf_details(context, details)
return details
def request_vrf_details(self, context, **kwargs):
return self.get_vrf_details(context, **kwargs)
# RPC Method
def _get_gbp_details(self, context, **kwargs):
port_id = self._core_plugin._device_to_port_id(
context, kwargs['device'])
port_context = self._core_plugin.get_bound_port_context(
context, port_id, kwargs['host'])
if not port_context:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s not found in database"),
{'device': port_id,
'agent_id': kwargs.get('agent_id')})
return {'device': kwargs.get('device')}
port = port_context.current
# retrieve PTG from a given Port
ptg, pt = self._port_id_to_ptg(context, port['id'])
context._plugin = self.gbp_plugin
context._plugin_context = context
switched = False
if pt and pt['description'].startswith(PROXY_PORT_PREFIX):
new_id = pt['description'].replace(
PROXY_PORT_PREFIX, '').rstrip(' ')
try:
LOG.debug("Replace port %s with port %s", port_id, new_id)
port = self._get_port(context, new_id)
ptg, pt = self._port_id_to_ptg(context, port['id'])
switched = True
except n_exc.PortNotFound:
LOG.warning(_("Proxied port %s could not be found"),
new_id)
l2p = self._network_id_to_l2p(context, port['network_id'])
if not ptg and not l2p:
return None
l2_policy_id = l2p['id']
if ptg:
ptg_tenant = self._tenant_by_sharing_policy(ptg)
endpoint_group_name = self.name_mapper.policy_target_group(
context, ptg)
else:
ptg_tenant = self._tenant_by_sharing_policy(l2p)
endpoint_group_name = self.name_mapper.l2_policy(
context, l2p, prefix=SHADOW_PREFIX)
def is_port_promiscuous(port):
if (pt and pt.get('cluster_id') and
pt.get('cluster_id') != pt['id']):
master = self._get_policy_target(context, pt['cluster_id'])
if master.get('group_default_gateway'):
return True
return (port['device_owner'] in PROMISCUOUS_TYPES or
port['name'].endswith(PROMISCUOUS_SUFFIX)) or (
pt and pt.get('group_default_gateway'))
details = {'device': kwargs.get('device'),
'port_id': port_id,
'mac_address': port['mac_address'],
'app_profile_name': str(
self.apic_manager.app_profile_name),
'l2_policy_id': l2_policy_id,
'l3_policy_id': l2p['l3_policy_id'],
'tenant_id': port['tenant_id'],
'host': port[portbindings.HOST_ID],
'ptg_tenant': self.apic_manager.apic.fvTenant.name(
ptg_tenant),
'endpoint_group_name': str(endpoint_group_name),
'promiscuous_mode': is_port_promiscuous(port),
'extra_ips': [],
'floating_ip': [],
'ip_mapping': [],
# Put per mac-address extra info
'extra_details': {}}
if switched:
details['fixed_ips'] = port['fixed_ips']
if port['device_owner'].startswith('compute:') and port[
'device_id']:
vm = nclient.NovaClient().get_server(port['device_id'])
details['vm-name'] = vm.name if vm else port['device_id']
l3_policy = context._plugin.get_l3_policy(context,
l2p['l3_policy_id'])
own_addr = set()
if pt:
own_addr = set(self._get_owned_addresses(context,
pt['port_id']))
own_addr |= set(self._get_owned_addresses(context, port_id))
(details['floating_ip'], details['ip_mapping'],
details['host_snat_ips']) = (
self._get_ip_mapping_details(
context, port['id'], l3_policy, pt=pt,
owned_addresses=own_addr, host=kwargs['host']))
self._add_network_details(context, port, details, pt=pt,
owned=own_addr, inject_default_route=
l2p['inject_default_route'])
self._add_vrf_details(context, details)
if self._is_pt_chain_head(context, pt, ptg, owned_ips=own_addr):
# is a relevant proxy_gateway, push all the addresses from this
# chain to this PT
extra_map = details
master_mac = self._is_master_owner(context, pt,
owned_ips=own_addr)
if master_mac:
extra_map = details['extra_details'].setdefault(
master_mac, {'extra_ips': [], 'floating_ip': [],
'ip_mapping': [], 'host_snat_ips': []})
if bool(master_mac) == bool(pt['cluster_id']):
l3_policy = context._plugin.get_l3_policy(
context, l2p['l3_policy_id'])
while ptg['proxied_group_id']:
proxied = self.gbp_plugin.get_policy_target_group(
context, ptg['proxied_group_id'])
for port in self._get_ptg_ports(proxied):
extra_map['extra_ips'].extend(
[x['ip_address'] for x in port['fixed_ips']])
(fips, ipms, host_snat_ips) = (
self._get_ip_mapping_details(
context, port['id'], l3_policy,
host=kwargs['host']))
extra_map['floating_ip'].extend(fips)
if not extra_map['ip_mapping']:
extra_map['ip_mapping'].extend(ipms)
if not extra_map['host_snat_ips']:
extra_map['host_snat_ips'].extend(
host_snat_ips)
ptg = proxied
else:
LOG.info(_("Active master has changed for PT %s"),
pt['id'])
# There's no master mac even if a cluster_id is set.
# Active chain head must have changed in a concurrent
# operation, get out of here
pass
return details
# RPC Method
def get_gbp_details(self, context, **kwargs):
try:
port_id = self._core_plugin._device_to_port_id(
context, kwargs['device'])
port_context = self._core_plugin.get_bound_port_context(
context, port_id, kwargs['host'])
if not port_context:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s not found in database"),
{'device': port_id,
'agent_id': kwargs.get('agent_id')})
return
port = port_context.current
# retrieve PTG from a given Port
ptg, pt = self._port_id_to_ptg(context, port['id'])
context._plugin = self.gbp_plugin
context._plugin_context = context
switched = False
if pt and pt['description'].startswith(PROXY_PORT_PREFIX):
new_id = pt['description'].replace(
PROXY_PORT_PREFIX, '').rstrip(' ')
try:
LOG.debug("Replace port %s with port %s", port_id, new_id)
port = self._get_port(context, new_id)
ptg, pt = self._port_id_to_ptg(context, port['id'])
switched = True
except n_exc.PortNotFound:
LOG.warning(_("Proxied port %s could not be found"),
new_id)
l2p = self._network_id_to_l2p(context, port['network_id'])
if not ptg and not l2p:
return
l2_policy_id = l2p['id']
if ptg:
ptg_tenant = self._tenant_by_sharing_policy(ptg)
endpoint_group_name = self.name_mapper.policy_target_group(
context, ptg)
else:
ptg_tenant = self._tenant_by_sharing_policy(l2p)
endpoint_group_name = self.name_mapper.l2_policy(
context, l2p, prefix=SHADOW_PREFIX)
def is_port_promiscuous(port):
if (pt and pt.get('cluster_id') and
pt.get('cluster_id') != pt['id']):
master = self._get_policy_target(context, pt['cluster_id'])
if master.get('group_default_gateway'):
return True
return (port['device_owner'] in PROMISCUOUS_TYPES or
port['name'].endswith(PROMISCUOUS_SUFFIX)) or (
pt and pt.get('group_default_gateway'))
details = {'device': kwargs.get('device'),
'port_id': port_id,
'mac_address': port['mac_address'],
'app_profile_name': str(
self.apic_manager.app_profile_name),
'l2_policy_id': l2_policy_id,
'l3_policy_id': l2p['l3_policy_id'],
'tenant_id': port['tenant_id'],
'host': port[portbindings.HOST_ID],
'ptg_tenant': self.apic_manager.apic.fvTenant.name(
ptg_tenant),
'endpoint_group_name': str(endpoint_group_name),
'promiscuous_mode': is_port_promiscuous(port),
'extra_ips': [],
'floating_ip': [],
'ip_mapping': [],
# Put per mac-address extra info
'extra_details': {}}
if switched:
details['fixed_ips'] = port['fixed_ips']
if port['device_owner'].startswith('compute:') and port[
'device_id']:
vm = nclient.NovaClient().get_server(port['device_id'])
details['vm-name'] = vm.name if vm else port['device_id']
l3_policy = context._plugin.get_l3_policy(context,
l2p['l3_policy_id'])
own_addr = set()
if pt:
own_addr = set(self._get_owned_addresses(context,
pt['port_id']))
own_addr |= set(self._get_owned_addresses(context, port_id))
(details['floating_ip'], details['ip_mapping'],
details['host_snat_ips']) = (
self._get_ip_mapping_details(
context, port['id'], l3_policy, pt=pt,
owned_addresses=own_addr, host=kwargs['host']))
self._add_network_details(context, port, details, pt=pt,
owned=own_addr, inject_default_route=
l2p['inject_default_route'])
self._add_vrf_details(context, details)
if self._is_pt_chain_head(context, pt, ptg, owned_ips=own_addr):
# is a relevant proxy_gateway, push all the addresses from this
# chain to this PT
extra_map = details
master_mac = self._is_master_owner(context, pt,
owned_ips=own_addr)
if master_mac:
extra_map = details['extra_details'].setdefault(
master_mac, {'extra_ips': [], 'floating_ip': [],
'ip_mapping': [], 'host_snat_ips': []})
if bool(master_mac) == bool(pt['cluster_id']):
l3_policy = context._plugin.get_l3_policy(
context, l2p['l3_policy_id'])
while ptg['proxied_group_id']:
proxied = self.gbp_plugin.get_policy_target_group(
context, ptg['proxied_group_id'])
for port in self._get_ptg_ports(proxied):
extra_map['extra_ips'].extend(
[x['ip_address'] for x in port['fixed_ips']])
(fips, ipms, host_snat_ips) = (
self._get_ip_mapping_details(
context, port['id'], l3_policy,
host=kwargs['host']))
extra_map['floating_ip'].extend(fips)
if not extra_map['ip_mapping']:
extra_map['ip_mapping'].extend(ipms)
if not extra_map['host_snat_ips']:
extra_map['host_snat_ips'].extend(
host_snat_ips)
ptg = proxied
else:
LOG.info(_("Active master has changed for PT %s"),
pt['id'])
# There's no master mac even if a cluster_id is set.
# Active chain head must have changed in a concurrent
# operation, get out of here
pass
return self._get_gbp_details(context, **kwargs)
except Exception as e:
LOG.error(_("An exception has occurred while retrieving device "
"gbp details for %(device)s with error %(error)s"),
{'device': kwargs.get('device'), 'error': e.message})
LOG.error(_(
"An exception has occurred while retrieving device "
"gbp details for %s"), kwargs.get('device'))
LOG.exception(e)
details = {'device': kwargs.get('device')}
return details
# RPC Method
def request_gbp_details(self, context, **kwargs):
try:
LOG.debug("Request GBP details: %s", kwargs)
kwargs.update(kwargs['request'])
result = {'device': kwargs['device'],
'timestamp': kwargs['timestamp'],
'request_id': kwargs['request_id'],
'gbp_details': None,
'neutron_details': None}
result['gbp_details'] = self._get_gbp_details(context, **kwargs)
result['neutron_details'] = neu_rpc.RpcCallbacks(
None, None).get_device_details(context, **kwargs)
return result
except Exception as e:
LOG.error(_("An exception has occurred while requesting device "
"gbp details for %s"), kwargs.get('device'))
LOG.exception(e)
return None
def _allocate_snat_ip_for_host_and_ext_net(self, context, host, network,
es_name):
"""Allocate SNAT IP for a host for an external network."""

View File

@@ -97,7 +97,8 @@ class ApicMappingTestCase(
'tenant_network_types': ['opflex']
}
mock.patch('gbpservice.neutron.services.grouppolicy.drivers.cisco.'
'apic.apic_mapping.ApicMappingDriver._setup_rpc').start()
'apic.apic_mapping.ApicMappingDriver.'
'_setup_rpc_listeners').start()
host_agents = mock.patch('neutron.plugins.ml2.driver_context.'
'PortContext.host_agents').start()
host_agents.return_value = [self.agent_conf]
@@ -293,6 +294,11 @@ class TestPolicyTarget(ApicMappingTestCase):
mapping = self.driver.get_gbp_details(context.get_admin_context(),
device='tap%s' % pt1['port_id'], host='h1')
req_mapping = self.driver.request_gbp_details(
context.get_admin_context(),
request={'device': 'tap%s' % pt1['port_id'], 'host': 'h1',
'timestamp': 0, 'request_id': 'request_id'})
self.assertEqual(mapping, req_mapping['gbp_details'])
self.assertEqual(pt1['port_id'], mapping['port_id'])
self.assertEqual(ptg['id'], mapping['endpoint_group_name'])
self.assertEqual('someid', mapping['vm-name'])
@@ -321,6 +327,10 @@ class TestPolicyTarget(ApicMappingTestCase):
mapping['host_snat_ips'][0]['host_snat_ip'])
self.assertEqual(24, mapping['host_snat_ips'][0]['prefixlen'])
# Verify Neutron details
self.assertEqual(pt1['port_id'],
req_mapping['neutron_details']['port_id'])
# Create event on a second host to verify that the SNAT
# port gets created for this second host
pt2 = self.create_policy_target(
@@ -612,8 +622,15 @@ class TestPolicyTarget(ApicMappingTestCase):
details = self.driver.get_gbp_details(
context.get_admin_context(), device='tap%s' % 'randomid',
host='h1')
req_details = self.driver.request_gbp_details(
context.get_admin_context(),
request={'device': 'tap%s' % 'randomid', 'host': 'h1',
'timestamp': 0, 'request_id': 'request_id'})
# device was not found
self.assertEqual(None, details)
self.assertTrue('port_id' not in details)
self.assertEqual(details, req_details['gbp_details'])
self.assertTrue('port_id' not in req_details['neutron_details'])
ptg = self.create_policy_target_group()['policy_target_group']
pt1 = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
@@ -622,8 +639,13 @@ class TestPolicyTarget(ApicMappingTestCase):
details = self.driver.get_gbp_details(
context.get_admin_context(), device='tap%s' % pt1['port_id'],
host='h1')
# device was not found
req_details = self.driver.request_gbp_details(
context.get_admin_context(),
request={'device': 'tap%s' % pt1['port_id'], 'host': 'h1',
'timestamp': 0, 'request_id': 'request_id'})
# An exception occurred
self.assertEqual({'device': 'tap%s' % pt1['port_id']}, details)
self.assertIsNone(req_details)
def test_get_gbp_proxy_details(self):
l3p_fake = self.create_l3_policy(name='myl3')['l3_policy']

View File

@@ -990,7 +990,16 @@ class TestProxyGroup(ApicMappingStitchingPlumberGBPTestCase):
self._get_object('subnets', proxy['subnets'][0], self.api,
expected_res_status=404)
def _test_get_gbp_details(self, admin_proxy=False):
def _test_get_gbp_details(self, admin_proxy=False, async=False):
def request_wrapper(*args, **kwargs):
kwargs['timestamp'] = 0
kwargs['request_id'] = 'some_id'
result = self.driver.request_gbp_details(*args, request=kwargs)
if result:
return result.get('gbp_details')
gbp_details = {False: self.driver.get_gbp_details,
True: request_wrapper}
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']
pt1 = self.create_policy_target(
@@ -1037,7 +1046,7 @@ class TestProxyGroup(ApicMappingStitchingPlumberGBPTestCase):
def echo(name):
return name
self.mgr.apic.fvTenant.name = echo
mapping = self.driver.get_gbp_details(
mapping = gbp_details[async](
context.get_admin_context(),
device='tap%s' % proxy_gw['port_id'], host='h2')
@@ -1054,13 +1063,13 @@ class TestProxyGroup(ApicMappingStitchingPlumberGBPTestCase):
group_default_gateway=True,
tenant_id=ptg['tenant_id'])['policy_target']
self._bind_port_to_host(pt2['port_id'], 'h2')
mapping = self.driver.get_gbp_details(
mapping = gbp_details[async](
context.get_admin_context(),
device='tap%s' % group_default_gw['port_id'], host='h2')
self.assertTrue(mapping['promiscuous_mode'])
# No extra IPs for the failover since it doesn't own the master IP
mapping = self.driver.get_gbp_details(
mapping = gbp_details[async](
context.get_admin_context(),
device='tap%s' % proxy_gw_failover['port_id'], host='h2')
self.assertEqual(0, len(mapping['extra_ips'] or []))
@@ -1074,7 +1083,7 @@ class TestProxyGroup(ApicMappingStitchingPlumberGBPTestCase):
self.driver.ha_ip_handler.set_port_id_for_ha_ipaddress(
proxy_gw_failover['port_id'], x['ip_address'])
mapping = self.driver.get_gbp_details(
mapping = gbp_details[async](
context.get_admin_context(),
device='tap%s' % proxy_gw_failover['port_id'], host='h2')
self.assertEqual(
@@ -1092,6 +1101,12 @@ class TestProxyGroup(ApicMappingStitchingPlumberGBPTestCase):
def test_get_gbp_details_admin(self):
self._test_get_gbp_details(True)
def test_get_gbp_details_async(self):
self._test_get_gbp_details(False, True)
def test_get_gbp_details_admin_async(self):
self._test_get_gbp_details(True, True)
def test_cluster_promiscuous_mode(self):
ptg = self.create_policy_target_group(
name="ptg1")['policy_target_group']