[APIC-mapping] Support non-OpFlex networks

This change enables the apic-mapping driver to
handle endpoints (ports) when the network-type
is VLAN. In this scenario, there are no OpFlex
agents on the compute/network nodes that can configure
the datapath to isolate traffic and enforce policy.
Instead we use additional 'shadow' Neutron networks,
one per PTG, to isolate ports within a PTG from other
PTGs. Static path binding is configured on-demand on
APIC using the segmentation-ID of the shadow networks.

These additional "shadow" networks mirror the
configuration of the Neutron network created for
the L2-policy of the PTGs:
- The shadow networks are named using the convention
 apic_owned_ptg_<ptg-name>_<ptg-id>
- Subnets in the L2P network are reflected in shadow
networks of each PTG associated with the L2P. Their
names use the convention
 apic_owned_sub_<id of subnet in L2P network>
- Implicit ports created for policy-targets are mirrored
in the shadow network of the corresponding PTG. This
additional 'shadow' port is now associated with the
PT instead of the implicitly created port. These implicit
ports have device_owner set to 'apic' and device_id set
to the ID of the PT for which they were created.
- Explicit ports for PTs is allowed as long as they have
been created in the appropriate shadow network
- DHCP ports on the shadow networks are disabled to avoid
unwanted/wrong responses to DHCP requests from VMs.

This scheme imposes a few restrictions:
- PTG of a PT can be no longer updated because that would
require changing the network of the associated port
- Ports created in the L2P network cannot be associated
with a PT; instead such an explicit port must be created
in the shadow network for the PTG of the PT.

Without OpFlex agent, NAT-ing won't work with APIC. So
the apic-mapping driver configures external-segments on
APIC for the no-NAT case. Once support for edge-NAT is
available (e.g. through ASR integration), we can make
the appropriate changes to make floating-IPs and PAT
work.
Service-chaining, which requires special handling of
endpoints in the OpFlex agent, is not expected to work
out-of-the-box with non-OpFlex network.

Closes-Bug: 1581264

Change-Id: I39d09257f6eadcf5ae527d779f13493415a4ec85
Signed-off-by: Amit Bose <amitbose@gmail.com>
(cherry picked from commit ba62041267)
(cherry picked from commit 971ed58eee)
This commit is contained in:
Amit Bose
2016-05-06 14:41:52 -07:00
parent 5046dc0d00
commit 7c3eb7b8eb
5 changed files with 831 additions and 63 deletions

View File

@@ -90,7 +90,7 @@ class ApiManagerMixin(object):
def _update_resource(
self, id, type, expected_res_status=None, is_admin_context=False,
**kwargs):
api=None, **kwargs):
plural = cm.get_resource_plural(type)
data = {type: kwargs}
tenant_id = kwargs.pop('tenant_id', self._tenant_id)
@@ -99,7 +99,7 @@ class ApiManagerMixin(object):
req.environ['neutron.context'] = context.Context(
'', tenant_id if not is_admin_context else self._tenant_id,
is_admin_context)
res = req.get_response(self.ext_api)
res = req.get_response(api or self.ext_api)
if expected_res_status:
self.assertEqual(res.status_int, expected_res_status)
@@ -143,6 +143,23 @@ class ApiManagerMixin(object):
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(self.fmt, res)
def _list_resource(self, plural, api, is_admin_context=False,
expected_res_status=None, tenant_id=None,
**kwargs):
param_str = '&'.join(['%s=%s' % (k, v)
for k, v in kwargs.iteritems()])
req = self.new_list_request(plural, self.fmt,
params=param_str or None)
req.environ['neutron.context'] = context.Context(
'', tenant_id or self._tenant_id, is_admin_context)
res = req.get_response(api)
if expected_res_status:
self.assertEqual(res.status_int, expected_res_status)
elif res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(self.fmt, res)
def _bind_port_to_host(self, port_id, host, data=None):
plugin = manager.NeutronManager.get_plugin()
ctx = context.get_admin_context()

View File

@@ -94,15 +94,17 @@ class ApicMappingTestCase(
mocked.ControllerMixin, mocked.ConfigMixin):
def setUp(self, sc_plugin=None, nat_enabled=True,
pre_existing_l3out=False):
self.agent_conf = AGENT_CONF
pre_existing_l3out=False, default_agent_conf=True,
ml2_options=None):
if default_agent_conf:
self.agent_conf = AGENT_CONF
cfg.CONF.register_opts(sg_cfg.security_group_opts, 'SECURITYGROUP')
config.cfg.CONF.set_override('enable_security_group', False,
group='SECURITYGROUP')
n_rpc.create_connection = mock.Mock()
amap.ApicMappingDriver.get_apic_manager = mock.MagicMock()
self.set_up_mocks()
ml2_opts = {
ml2_opts = ml2_options or {
'mechanism_drivers': ['apic_gbp'],
'type_drivers': ['opflex'],
'tenant_network_types': ['opflex']
@@ -150,6 +152,7 @@ class ApicMappingTestCase(
self.nat_enabled = nat_enabled
self.driver.l3out_vlan_alloc = mock.Mock()
self.pre_l3out = pre_existing_l3out
self.non_apic_network = False
def echo2(string):
return string
@@ -276,11 +279,42 @@ l3extRsPathL3OutAtt": {"attributes": {"ifInstT": "sub-interface", "encap": \
port_id, host, data=data)
class ApicMappingVlanTestCase(ApicMappingTestCase):
def setUp(self, **kwargs):
config.cfg.CONF.set_override(
'network_vlan_ranges', ['physnet1:100:200'], group='ml2_type_vlan')
kwargs['ml2_options'] = {
'mechanism_drivers': ['apic_gbp', 'openvswitch'],
'type_drivers': ['vlan'],
'tenant_network_types': ['vlan']
}
kwargs['default_agent_conf'] = False
super(ApicMappingVlanTestCase, self).setUp(**kwargs)
self.non_apic_network = True
def _get_ptg_shadow_net(self, ptg):
net = self._list_resource('networks', self.api,
tenant_id=ptg['tenant_id'],
name=self.driver._get_ptg_shadow_network_name(ptg))
net = net['networks']
if net:
return net[0]
def _get_ptg_shadow_subnet(self, ptg):
shadow_net = self._get_ptg_shadow_net(ptg)
if shadow_net:
return shadow_net['subnets'][0]
class TestPolicyTarget(ApicMappingTestCase):
def test_policy_target_port_deleted_on_apic(self):
ptg = self.create_policy_target_group()['policy_target_group']
subnet = self._get_object('subnets', ptg['subnets'][0], self.api)
subnet = self._get_object('subnets',
self._get_ptg_shadow_subnet(ptg) if self.non_apic_network
else ptg['subnets'][0],
self.api)
with self.port(subnet=subnet) as port:
self._bind_port_to_host(port['port']['id'], 'h1')
pt = self.create_policy_target(
@@ -290,7 +324,10 @@ class TestPolicyTarget(ApicMappingTestCase):
def test_policy_target_delete_no_port(self):
ptg = self.create_policy_target_group()['policy_target_group']
subnet = self._get_object('subnets', ptg['subnets'][0], self.api)
subnet = self._get_object('subnets',
self._get_ptg_shadow_subnet(ptg) if self.non_apic_network
else ptg['subnets'][0],
self.api)
with self.port(subnet=subnet) as port:
self._bind_port_to_host(port['port']['id'], 'h1')
pt = self.create_policy_target(
@@ -312,7 +349,10 @@ class TestPolicyTarget(ApicMappingTestCase):
# No notification needed
self.assertFalse(self.driver.notifier.port_update.called)
self.driver.notifier.port_update.reset_mock()
subnet = self._get_object('subnets', ptg['subnets'][0], self.api)
subnet = self._get_object('subnets',
self._get_ptg_shadow_subnet(ptg) if self.non_apic_network
else ptg['subnets'][0],
self.api)
with self.port(subnet=subnet) as port:
# Create EP with bound port
port = self._bind_port_to_host(port['port']['id'], 'h1')
@@ -368,7 +408,9 @@ class TestPolicyTarget(ApicMappingTestCase):
self.assertEqual('someid', mapping['vm-name'])
self.assertTrue(mapping['enable_dhcp_optimization'])
self.assertEqual(1, len(mapping['subnets']))
self.assertEqual(ptg['subnets'][0], mapping['subnets'][0]['id'])
subnet = self._get_object('subnets', ptg['subnets'][0], self.api)
self.assertEqual(subnet['subnet']['cidr'],
mapping['subnets'][0]['cidr'])
self.assertEqual(1, len(mapping['floating_ip']))
fip = mapping['floating_ip'][0]
self.assertEqual(pt1['port_id'], fip['port_id'])
@@ -460,6 +502,9 @@ class TestPolicyTarget(ApicMappingTestCase):
l3_policy_id=l3p['id'])['l2_policy']
ptg = self.create_policy_target_group(
name="ptg1", l2_policy_id=l2p['id'])['policy_target_group']
net_id = (self._get_ptg_shadow_net(ptg)['id']
if self.non_apic_network else l2p['network_id'])
pt1 = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
pt2 = self.create_policy_target(
@@ -474,7 +519,7 @@ class TestPolicyTarget(ApicMappingTestCase):
self.driver.ip_address_owner_update(context.get_admin_context(),
ip_owner_info=ip_owner_info, host='h1')
obj = self.driver.ha_ip_handler.get_port_for_ha_ipaddress(
'1.2.3.4', l2p['network_id'])
'1.2.3.4', net_id)
self.assertEqual(pt1['port_id'], obj['port_id'])
self.driver._notify_port_update.assert_called_with(mock.ANY,
pt1['port_id'])
@@ -485,7 +530,7 @@ class TestPolicyTarget(ApicMappingTestCase):
self.driver.ip_address_owner_update(context.get_admin_context(),
ip_owner_info=ip_owner_info, host='h2')
obj = self.driver.ha_ip_handler.get_port_for_ha_ipaddress(
'1.2.3.4', l2p['network_id'])
'1.2.3.4', net_id)
self.assertEqual(pt2['port_id'], obj['port_id'])
exp_calls = [
mock.call(mock.ANY, pt1['port_id']),
@@ -508,6 +553,12 @@ class TestPolicyTarget(ApicMappingTestCase):
self.api)
with self.port(subnet=sub, device_owner='network:dhcp',
tenant_id='onetenant') as dhcp:
if self.non_apic_network:
shadow_sub = self._get_object('subnets',
self._get_ptg_shadow_subnet(ptg), self.api)
with self.port(subnet=shadow_sub, tenant_id='onetenant',
device_owner='network:dhcp'):
pass
dhcp = dhcp['port']
details = self.driver.get_gbp_details(
context.get_admin_context(),
@@ -558,6 +609,12 @@ class TestPolicyTarget(ApicMappingTestCase):
with self.port(subnet=sub, device_owner='network:dhcp',
tenant_id='onetenant') as dhcp:
if self.non_apic_network:
shadow_sub = self._get_object('subnets',
self._get_ptg_shadow_subnet(ptg), self.api)
with self.port(subnet=shadow_sub, tenant_id='onetenant',
device_owner='network:dhcp'):
pass
dhcp = dhcp['port']
details = self.driver.get_gbp_details(
context.get_admin_context(),
@@ -647,6 +704,13 @@ class TestPolicyTarget(ApicMappingTestCase):
with self.port(subnet=sub, device_owner='network:dhcp',
tenant_id='onetenant') as dhcp:
if self.non_apic_network:
shadow_sub = self._get_object('subnets',
self._get_ptg_shadow_subnet(ptg), self.api)
with self.port(subnet=shadow_sub, tenant_id='onetenant',
device_owner='network:dhcp'):
pass
dhcp = dhcp['port']
details = self.driver.get_gbp_details(
context.get_admin_context(),
@@ -752,18 +816,21 @@ class TestPolicyTarget(ApicMappingTestCase):
def test_explicit_port(self):
with self.network() as net:
with self.subnet(network=net) as sub:
l2p = self.create_l2_policy(
network_id=net['network']['id'])['l2_policy']
ptg = self.create_policy_target_group(
l2_policy_id=l2p['id'])['policy_target_group']
if self.non_apic_network:
sub = self._get_object('subnets',
self._get_ptg_shadow_subnet(ptg), self.api)
with self.port(subnet=sub) as port:
self._bind_port_to_host(port['port']['id'], 'h1')
l2p = self.create_l2_policy(
network_id=net['network']['id'])['l2_policy']
ptg = self.create_policy_target_group(
l2_policy_id=l2p['id'])['policy_target_group']
self.create_policy_target(
port_id=port['port']['id'],
policy_target_group_id=ptg['id'])
self.assertTrue(self.driver.notifier.port_update.called)
def test_port_notified_on_changed_ptg(self):
def test_port_update_changed_ptg(self):
ptg = self.create_policy_target_group()['policy_target_group']
ptg2 = self.create_policy_target_group(
l2_policy_id=ptg['l2_policy_id'])['policy_target_group']
@@ -771,9 +838,17 @@ class TestPolicyTarget(ApicMappingTestCase):
policy_target_group_id=ptg['id'])['policy_target']
self._bind_port_to_host(pt['port_id'], 'h1')
self.driver.notifier.port_update.reset_mock()
self.update_policy_target(pt['id'], policy_target_group_id=ptg2['id'])
self.assertTrue(self.driver.notifier.port_update.called)
if not self.non_apic_network:
self.driver.notifier.port_update.reset_mock()
self.update_policy_target(pt['id'],
policy_target_group_id=ptg2['id'])
self.assertTrue(self.driver.notifier.port_update.called)
else:
res = self.update_policy_target(pt['id'],
policy_target_group_id=ptg2['id'],
expected_res_status=400)
self.assertEqual('PTGChangeDisallowedWithNonOpFlexNetwork',
res['NeutronError']['type'])
def test_update_ptg_failed(self):
ptg = self.create_policy_target_group()['policy_target_group']
@@ -784,7 +859,9 @@ class TestPolicyTarget(ApicMappingTestCase):
res = self.update_policy_target(
pt['id'], policy_target_group_id=ptg2['id'],
expected_res_status=400)
self.assertEqual('InvalidPortForPTG', res['NeutronError']['type'])
exp = ('PTGChangeDisallowedWithNonOpFlexNetwork'
if self.non_apic_network else 'InvalidPortForPTG')
self.assertEqual(exp, res['NeutronError']['type'])
def test_port_notified_on_subnet_change(self):
ptg = self.create_policy_target_group()['policy_target_group']
@@ -795,6 +872,8 @@ class TestPolicyTarget(ApicMappingTestCase):
subnet = self._get_object('subnets', ptg['subnets'][0], self.api)
subnet2 = copy.deepcopy(subnet)
subnet2['subnet']['gateway_ip'] = '10.0.0.254'
subnet2['subnet']['allocation_pools'] = [{
'start': '10.0.0.2', 'end': '10.0.0.250'}]
self.driver.apic_manager.reset_mock()
self.driver.notifier.port_update.reset_mock()
@@ -874,6 +953,246 @@ class TestPolicyTarget(ApicMappingTestCase):
self.assertEqual('1.1.1.1', entries[1].ha_ip_address)
class TestPolicyTargetVlanNetwork(ApicMappingVlanTestCase,
TestPolicyTarget):
def test_shadow_port(self):
ptg1 = self.create_policy_target_group(
name="ptg1")['policy_target_group']
pt1 = self.create_policy_target(
policy_target_group_id=ptg1['id'])['policy_target']
shadow_port = self._get_object('ports', pt1['port_id'],
self.api)['port']
subnet = self._get_object('subnets', ptg1['subnets'][0], self.api)
ports = self._list_resource('ports',
self.api, network_id=subnet['subnet']['network_id'])['ports']
self.assertEqual(1, len(ports))
self.assertEqual(shadow_port['mac_address'], ports[0]['mac_address'])
self.assertEqual(len(shadow_port['fixed_ips']),
len(ports[0]['fixed_ips']))
self.assertEqual(shadow_port['fixed_ips'][0]['ip_address'],
ports[0]['fixed_ips'][0]['ip_address'])
self.delete_policy_target(pt1['id'])
self._get_object('ports', pt1['port_id'], self.api,
expected_res_status=404)
self._get_object('ports', ports[0]['id'], self.api,
expected_res_status=404)
def test_shadow_port_for_explicit_port(self):
ptg1 = self.create_policy_target_group()['policy_target_group']
shadow_subnet1 = self._get_object('subnets',
self._get_ptg_shadow_subnet(ptg1),
self.api)
subnet = self._get_object('subnets', ptg1['subnets'][0], self.api)
with self.port(subnet=shadow_subnet1) as p:
port1 = p['port']
pt1 = self.create_policy_target(policy_target_group_id=ptg1['id'],
port_id=port1['id'])['policy_target']
subnet = self._get_object('subnets', ptg1['subnets'][0], self.api)
ports = self._list_resource('ports',
self.api, network_id=subnet['subnet']['network_id'])['ports']
self.assertEqual(1, len(ports))
self.assertEqual(port1['mac_address'], ports[0]['mac_address'])
self.assertEqual(len(port1['fixed_ips']),
len(ports[0]['fixed_ips']))
self.assertEqual(port1['fixed_ips'][0]['ip_address'],
ports[0]['fixed_ips'][0]['ip_address'])
self.delete_policy_target(pt1['id'])
self._get_object('ports', pt1['port_id'], self.api,
expected_res_status=200)
self._get_object('ports', ports[0]['id'], self.api,
expected_res_status=404)
def test_explicit_port_wrong_network(self):
ptg1 = self.create_policy_target_group()['policy_target_group']
subnet = self._get_object('subnets', ptg1['subnets'][0], self.api)
with self.port(subnet=subnet) as port1:
res = self.create_policy_target(policy_target_group_id=ptg1['id'],
port_id=port1['port']['id'], expected_res_status=400)
self.assertEqual('ExplicitPortInWrongNetwork',
res['NeutronError']['type'])
def test_explicit_port_overlap_address(self):
ptg1 = self.create_policy_target_group(
name="ptg1")['policy_target_group']
subnet = self._get_object('subnets', ptg1['subnets'][0], self.api)
shadow_subnet1 = self._get_object('subnets',
self._get_ptg_shadow_subnet(ptg1),
self.api)
with self.port(subnet=shadow_subnet1) as p:
shadow_port1 = p
ips = shadow_port1['port']['fixed_ips']
ips[0].pop('subnet_id', None)
with self.port(subnet=subnet, fixed_ips=ips) as p:
res = self.create_policy_target(
policy_target_group_id=ptg1['id'],
port_id=shadow_port1['port']['id'], expected_res_status=400)
self.assertEqual('ExplicitPortOverlap',
res['NeutronError']['type'])
res = self.new_delete_request('ports', p['port']['id'],
self.fmt).get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
with self.port(subnet=subnet,
mac_address=shadow_port1['port']['mac_address']) as p:
res = self.create_policy_target(
policy_target_group_id=ptg1['id'],
port_id=shadow_port1['port']['id'], expected_res_status=400)
self.assertEqual('ExplicitPortOverlap',
res['NeutronError']['type'])
def test_path_static_binding_implicit_port(self):
mgr = self.driver.apic_manager
ptg1 = self.create_policy_target_group(
name="ptg1")['policy_target_group']
pt1 = self.create_policy_target(
policy_target_group_id=ptg1['id'])['policy_target']
self._bind_port_to_host(pt1['port_id'], 'h1')
port_ctx = self.driver._core_plugin.get_bound_port_context(
context.get_admin_context(), pt1['port_id'])
seg_id = port_ctx.bottom_bound_segment['segmentation_id']
mgr.ensure_path_created_for_port.assert_called_once_with(
ptg1['tenant_id'], ptg1['id'], 'h1', seg_id,
bd_name=ptg1['l2_policy_id'])
# move port to different host
mgr.ensure_path_created_for_port.reset_mock()
self._bind_port_to_host(pt1['port_id'], 'h2')
mgr.ensure_path_deleted_for_port.assert_called_once_with(
ptg1['tenant_id'], ptg1['id'], 'h1')
mgr.ensure_path_created_for_port.assert_called_once_with(
ptg1['tenant_id'], ptg1['id'], 'h2', seg_id,
bd_name=ptg1['l2_policy_id'])
# create another PT, bind to same host and then delete it
mgr.ensure_path_created_for_port.reset_mock()
mgr.ensure_path_deleted_for_port.reset_mock()
pt2 = self.create_policy_target(
policy_target_group_id=ptg1['id'])['policy_target']
self._bind_port_to_host(pt2['port_id'], 'h2')
mgr.ensure_path_created_for_port.assert_called_once_with(
ptg1['tenant_id'], ptg1['id'], 'h2', seg_id,
bd_name=ptg1['l2_policy_id'])
self.delete_policy_target(pt2['id'])
mgr.ensure_path_deleted_for_port.assert_not_called()
# delete PT
mgr.ensure_path_deleted_for_port.reset_mock()
self.delete_policy_target(pt1['id'])
mgr.ensure_path_deleted_for_port.assert_called_once_with(
ptg1['tenant_id'], ptg1['id'], 'h2')
def test_path_static_binding_explicit_port(self):
mgr = self.driver.apic_manager
ptg1 = self.create_policy_target_group(
name="ptg1")['policy_target_group']
shadow_subnet1 = self._get_object('subnets',
self._get_ptg_shadow_subnet(ptg1),
self.api)
with self.port(subnet=shadow_subnet1) as port:
port1 = port
port1 = self._bind_port_to_host(port1['port']['id'], 'h1')
port_ctx = self.driver._core_plugin.get_bound_port_context(
context.get_admin_context(), port1['port']['id'])
seg_id = port_ctx.bottom_bound_segment['segmentation_id']
mgr.ensure_path_created_for_port.assert_not_called()
# Assign port to a PT
pt1 = self.create_policy_target(
policy_target_group_id=ptg1['id'],
port_id=port1['port']['id'])['policy_target']
mgr.ensure_path_created_for_port.assert_called_once_with(
ptg1['tenant_id'], ptg1['id'], 'h1', seg_id,
bd_name=ptg1['l2_policy_id'])
# move port to different host
mgr.ensure_path_created_for_port.reset_mock()
self._bind_port_to_host(pt1['port_id'], 'h2')
mgr.ensure_path_deleted_for_port.assert_called_once_with(
ptg1['tenant_id'], ptg1['id'], 'h1')
mgr.ensure_path_created_for_port.assert_called_once_with(
ptg1['tenant_id'], ptg1['id'], 'h2', seg_id,
bd_name=ptg1['l2_policy_id'])
# create another port & PT, bind to same host and then delete port
mgr.ensure_path_created_for_port.reset_mock()
mgr.ensure_path_deleted_for_port.reset_mock()
with self.port(subnet=shadow_subnet1) as port:
port2 = port
pt2 = self.create_policy_target(
policy_target_group_id=ptg1['id'],
port_id=port2['port']['id'])['policy_target']
self._bind_port_to_host(pt2['port_id'], 'h2')
mgr.ensure_path_created_for_port.assert_called_once_with(
ptg1['tenant_id'], ptg1['id'], 'h2', seg_id,
bd_name=ptg1['l2_policy_id'])
res = self.new_delete_request('ports', port2['port']['id'],
self.fmt).get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
mgr.ensure_path_deleted_for_port.assert_not_called()
# Delete PT
mgr.ensure_path_deleted_for_port.reset_mock()
self.delete_policy_target(pt1['id'])
mgr.ensure_path_deleted_for_port.assert_called_once_with(
ptg1['tenant_id'], ptg1['id'], 'h2')
def test_path_static_binding_for_non_pt(self):
mgr = self.driver.apic_manager
ptg1 = self.create_policy_target_group(
name="ptg1")['policy_target_group']
subnet = self._get_object('subnets', ptg1['subnets'][0], self.api)
with self.port(subnet=subnet) as port:
port1 = port
with self.port(subnet=subnet) as port:
port2 = port
# bind first port
port1 = self._bind_port_to_host(port1['port']['id'], 'h1')
port_ctx = self.driver._core_plugin.get_bound_port_context(
context.get_admin_context(), port1['port']['id'])
seg_id = port_ctx.bottom_bound_segment['segmentation_id']
mgr.ensure_path_created_for_port.assert_called_once_with(
ptg1['tenant_id'], 'Shd-%s' % ptg1['l2_policy_id'], 'h1',
seg_id, bd_name=ptg1['l2_policy_id'])
# bind second port
mgr.ensure_path_created_for_port.reset_mock()
port2 = self._bind_port_to_host(port2['port']['id'], 'h1')
mgr.ensure_path_created_for_port.assert_called_once_with(
ptg1['tenant_id'], 'Shd-%s' % ptg1['l2_policy_id'], 'h1',
seg_id, bd_name=ptg1['l2_policy_id'])
# delete second port
res = self.new_delete_request('ports', port2['port']['id'],
self.fmt).get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
mgr.ensure_path_deleted_for_port.assert_not_called()
# delete first port
mgr.ensure_path_deleted_for_port.reset_mock()
res = self.new_delete_request('ports', port1['port']['id'],
self.fmt).get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
mgr.ensure_path_deleted_for_port.assert_called_once_with(
ptg1['tenant_id'], 'Shd-%s' % ptg1['l2_policy_id'], 'h1')
class FakeNetworkContext(object):
"""To generate network context for testing purposes only."""
@@ -909,6 +1228,7 @@ class FakePortContext(object):
self.original = self._port
self.network = self._network
self.top_bound_segment = self._bound_segment
self.bottom_bound_segment = self._bound_segment
self.host = self._port.get(portbindings.HOST_ID)
self.original_host = None
self._binding = mock.Mock()
@@ -1359,12 +1679,14 @@ class TestPolicyTargetGroup(ApicMappingTestCase):
self.assertNotEqual(sub_ptg_1, sub_ptg_2)
self.assertFalse(sub_ptg_1 & sub_ptg_2)
def _create_explicit_subnet_ptg(self, cidr, shared=False):
def _create_explicit_subnet_ptg(self, cidr, shared=False, alloc_pool=None):
l2p = self.create_l2_policy(name="l2p", shared=shared)
l2p_id = l2p['l2_policy']['id']
network_id = l2p['l2_policy']['network_id']
network = self._get_object('networks', network_id, self.api)
with self.subnet(network=network, cidr=cidr):
pool = alloc_pool or [{'start': '10.0.0.2', 'end': '10.0.0.250'}]
with self.subnet(network=network, cidr=cidr,
allocation_pools=pool):
# The subnet creation in the proper network causes the subnet ID
# to be added to the PTG
return self.create_policy_target_group(
@@ -1372,6 +1694,103 @@ class TestPolicyTargetGroup(ApicMappingTestCase):
shared=shared)['policy_target_group']
class TestPolicyTargetGroupVlanNetwork(ApicMappingVlanTestCase,
TestPolicyTargetGroup):
def _test_shadow_network(self, shared):
ptg1 = self.create_policy_target_group(
name='ptg1', shared=shared)['policy_target_group']
l2p = self.show_l2_policy(ptg1['l2_policy_id'])['l2_policy']
net = self._get_object('networks', l2p['network_id'],
self.api)['network']
subnet1 = self._get_object('subnets', net['subnets'][0],
self.api)['subnet']
shadow_net1 = self._get_ptg_shadow_net(ptg1)
self.assertIsNotNone(shadow_net1)
self.assertEqual(ptg1['tenant_id'], shadow_net1['tenant_id'])
self.assertEqual(shared, shadow_net1['shared'])
self.assertEqual(1, len(shadow_net1['subnets']))
shadow_subnet1 = self._get_object('subnets',
shadow_net1['subnets'][0], self.api)['subnet']
self.assertEqual(subnet1['cidr'], shadow_subnet1['cidr'])
self.assertEqual(ptg1['tenant_id'], shadow_subnet1['tenant_id'])
self.delete_policy_target_group(ptg1['id'])
self._get_object('subnets', shadow_subnet1['id'], self.api,
expected_res_status=404)
self._get_object('networks', shadow_net1['id'], self.api,
expected_res_status=404)
def test_shadow_network(self):
self._test_shadow_network(False)
def test_shadow_network_shared(self):
self._test_shadow_network(True)
def _test_shadow_subnet(self, shared):
ptg1 = self.create_policy_target_group(
name='ptg1', shared=shared)['policy_target_group']
l2p = self.show_l2_policy(ptg1['l2_policy_id'])['l2_policy']
net = self._get_object('networks', l2p['network_id'],
self.api)['network']
subnet1 = self._get_object('subnets', net['subnets'][0],
self.api)['subnet']
shadow_net1 = self._get_ptg_shadow_net(ptg1)
with self.subnet(cidr='20.0.0.0/26',
network={'network': net}) as subnet2:
subnet2 = subnet2['subnet']
shadow_subnets = self._list_resource(
'subnets', self.api, network_id=shadow_net1['id'])['subnets']
shadow_subnets = sorted(shadow_subnets, key=lambda x: x['cidr'])
self.assertEqual(2, len(shadow_subnets))
self.assertEqual(subnet1['cidr'], shadow_subnets[0]['cidr'])
self.assertEqual(subnet2['cidr'], shadow_subnets[1]['cidr'])
self.assertTrue(shadow_subnets[0]['enable_dhcp'])
self.assertTrue(shadow_subnets[1]['enable_dhcp'])
subnet1 = self._update_resource(subnet1['id'], 'subnet',
expected_res_status=200, api=self.api,
enable_dhcp=False)['subnet']
self.assertFalse(subnet1['enable_dhcp'])
shadow_subnets = self._list_resource(
'subnets', self.api, network_id=shadow_net1['id'])['subnets']
shadow_subnets = sorted(shadow_subnets, key=lambda x: x['cidr'])
self.assertFalse(shadow_subnets[0]['enable_dhcp'])
self.delete_policy_target_group(ptg1['id'])
shadow_subnets = self._list_resource('subnets', self.api,
network_id=shadow_net1['id'], expected_res_status=200)['subnets']
self.assertEqual([], shadow_subnets)
def test_shadow_subnet(self):
self._test_shadow_subnet(False)
def test_shadow_subnet_shared(self):
self._test_shadow_subnet(True)
def test_dhcp_port_disabled_in_shadow(self):
ptg1 = self.create_policy_target_group(
name='ptg1')['policy_target_group']
shadow_net1 = self._get_ptg_shadow_net(ptg1)
shadow_subnet1 = self._get_object('subnets',
shadow_net1['subnets'][0], self.api)
with self.port(subnet=shadow_subnet1,
device_owner='network:dhcp') as port:
port = self._get_object('ports', port['port']['id'], self.api)
self.assertFalse(port['port']['admin_state_up'])
self._update_resource(port['port']['id'], 'port',
expected_res_status=200, api=self.api,
admin_state_up=True)
port = self._get_object('ports', port['port']['id'], self.api)
self.assertFalse(port['port']['admin_state_up'])
class TestL2Policy(ApicMappingTestCase):
def _test_l2_policy_created_on_apic(self, shared=False):