You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2891 lines
141 KiB
2891 lines
141 KiB
# Copyright (c) 2012 OpenStack Foundation. |
|
# |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
# you may not use this file except in compliance with the License. |
|
# You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
|
# implied. |
|
# See the License for the specific language governing permissions and |
|
# limitations under the License. |
|
|
|
import contextlib |
|
from eventlet import greenthread |
|
import mock |
|
from neutron.api.v2 import attributes |
|
from neutron.common import constants |
|
from neutron.common import exceptions as n_exc |
|
from neutron.common import ipv6_utils |
|
from neutron import context |
|
from neutron.extensions import dvr as dist_router |
|
from neutron.extensions import external_net |
|
from neutron.extensions import l3 |
|
from neutron.extensions import l3_ext_gw_mode |
|
from neutron.extensions import portbindings |
|
from neutron.extensions import portsecurity as psec |
|
from neutron.extensions import providernet as pnet |
|
from neutron.extensions import securitygroup as secgrp |
|
from neutron import manager |
|
|
|
from neutron.tests.unit import _test_extension_portbindings as test_bindings |
|
import neutron.tests.unit.db.test_allowedaddresspairs_db as test_addr_pair |
|
import neutron.tests.unit.db.test_db_base_plugin_v2 as test_plugin |
|
import neutron.tests.unit.extensions.test_l3 as test_l3_plugin |
|
import neutron.tests.unit.extensions.test_l3_ext_gw_mode as test_ext_gw_mode |
|
import neutron.tests.unit.extensions.test_portsecurity as test_psec |
|
import neutron.tests.unit.extensions.test_securitygroup as ext_sg |
|
from neutron.tests.unit import testlib_api |
|
from oslo_config import cfg |
|
from oslo_db import exception as db_exc |
|
from oslo_utils import uuidutils |
|
import six |
|
import webob.exc |
|
|
|
from vmware_nsx.common import nsx_constants |
|
from vmware_nsx.db import nsxv_db |
|
from vmware_nsx.extensions import ( |
|
routertype as router_type) |
|
from vmware_nsx.extensions import ( |
|
vnicindex as ext_vnic_idx) |
|
from vmware_nsx.plugins.nsx_v.vshield.common import ( |
|
constants as vcns_const) |
|
from vmware_nsx.plugins.nsx_v.vshield import edge_utils |
|
from vmware_nsx.tests.unit import vmware |
|
from vmware_nsx.tests.unit.vmware.extensions import test_vnic_index |
|
from vmware_nsx.tests.unit.vmware import test_utils |
|
from vmware_nsx.tests.unit.vmware.vshield import fake_vcns |
|
|
|
PLUGIN_NAME = 'vmware_nsx.plugin.NsxVPlugin' |
|
|
|
_uuid = uuidutils.generate_uuid |
|
|
|
|
|
class NsxVPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase): |
|
|
|
def _create_network(self, fmt, name, admin_state_up, |
|
arg_list=None, providernet_args=None, **kwargs): |
|
data = {'network': {'name': name, |
|
'admin_state_up': admin_state_up, |
|
'tenant_id': self._tenant_id}} |
|
# Fix to allow the router:external attribute and any other |
|
# attributes containing a colon to be passed with |
|
# a double underscore instead |
|
kwargs = dict((k.replace('__', ':'), v) for k, v in kwargs.items()) |
|
if external_net.EXTERNAL in kwargs: |
|
arg_list = (external_net.EXTERNAL, ) + (arg_list or ()) |
|
|
|
attrs = kwargs |
|
if providernet_args: |
|
attrs.update(providernet_args) |
|
for arg in (('admin_state_up', 'tenant_id', 'shared') + |
|
(arg_list or ())): |
|
# Arg must be present and not empty |
|
if arg in kwargs: |
|
data['network'][arg] = kwargs[arg] |
|
network_req = self.new_create_request('networks', data, fmt) |
|
if (kwargs.get('set_context') and 'tenant_id' in kwargs): |
|
# create a specific auth context for this request |
|
network_req.environ['neutron.context'] = context.Context( |
|
'', kwargs['tenant_id']) |
|
return network_req.get_response(self.api) |
|
|
|
def setUp(self, |
|
plugin=PLUGIN_NAME, |
|
ext_mgr=None, |
|
service_plugins=None): |
|
test_utils.override_nsx_ini_test() |
|
mock_vcns = mock.patch(vmware.VCNS_NAME, autospec=True) |
|
mock_vcns_instance = mock_vcns.start() |
|
self.fc2 = fake_vcns.FakeVcns() |
|
mock_vcns_instance.return_value = self.fc2 |
|
edge_utils.query_dhcp_service_config = mock.Mock(return_value=[]) |
|
self.mock_create_dhcp_service = mock.patch("%s.%s" % ( |
|
vmware.EDGE_MANAGE_NAME, 'create_dhcp_edge_service')) |
|
self.mock_create_dhcp_service.start() |
|
mock_update_dhcp_service = mock.patch("%s.%s" % ( |
|
vmware.EDGE_MANAGE_NAME, 'update_dhcp_edge_service')) |
|
mock_update_dhcp_service.start() |
|
mock_delete_dhcp_service = mock.patch("%s.%s" % ( |
|
vmware.EDGE_MANAGE_NAME, 'delete_dhcp_edge_service')) |
|
mock_delete_dhcp_service.start() |
|
super(NsxVPluginV2TestCase, self).setUp(plugin=plugin, |
|
ext_mgr=ext_mgr) |
|
self.addCleanup(self.fc2.reset_all) |
|
plugin_instance = manager.NeutronManager.get_plugin() |
|
plugin_instance._get_edge_id_by_rtr_id = mock.Mock() |
|
plugin_instance._get_edge_id_by_rtr_id.return_value = False |
|
|
|
def test_get_vlan_network_name(self): |
|
p = manager.NeutronManager.get_plugin() |
|
id = uuidutils.generate_uuid() |
|
net = {'name': '', |
|
'id': id} |
|
expected = id |
|
self.assertEqual(expected, |
|
p._get_vlan_network_name(net)) |
|
net = {'name': 'pele', |
|
'id': id} |
|
expected = '%s-%s' % ('pele', id) |
|
self.assertEqual(expected, |
|
p._get_vlan_network_name(net)) |
|
name = 'X' * 500 |
|
net = {'name': name, |
|
'id': id} |
|
expected = '%s-%s' % (name[:43], id) |
|
self.assertEqual(expected, |
|
p._get_vlan_network_name(net)) |
|
|
|
def test_create_port_anticipating_allocation(self): |
|
with self.network(shared=True) as network: |
|
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet: |
|
fixed_ips = [{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': subnet['subnet']['id'], |
|
'ip_address': '10.0.0.3'}] |
|
self._create_port(self.fmt, network['network']['id'], |
|
webob.exc.HTTPCreated.code, |
|
fixed_ips=fixed_ips) |
|
|
|
|
|
class TestNetworksV2(test_plugin.TestNetworksV2, NsxVPluginV2TestCase): |
|
|
|
def test_create_network_vlan_transparent(self): |
|
self.skipTest("Currently no support in plugin for this") |
|
|
|
def _test_create_bridge_network(self, vlan_id=0): |
|
net_type = vlan_id and 'vlan' or 'flat' |
|
name = 'bridge_net' |
|
expected = [('subnets', []), ('name', name), ('admin_state_up', True), |
|
('status', 'ACTIVE'), ('shared', False), |
|
(pnet.NETWORK_TYPE, net_type), |
|
(pnet.PHYSICAL_NETWORK, 'tzuuid'), |
|
(pnet.SEGMENTATION_ID, vlan_id)] |
|
providernet_args = {pnet.NETWORK_TYPE: net_type, |
|
pnet.PHYSICAL_NETWORK: 'tzuuid'} |
|
if vlan_id: |
|
providernet_args[pnet.SEGMENTATION_ID] = vlan_id |
|
with self.network(name=name, |
|
providernet_args=providernet_args, |
|
arg_list=(pnet.NETWORK_TYPE, |
|
pnet.PHYSICAL_NETWORK, |
|
pnet.SEGMENTATION_ID)) as net: |
|
for k, v in expected: |
|
self.assertEqual(net['network'][k], v) |
|
|
|
def test_create_bridge_network(self): |
|
self._test_create_bridge_network() |
|
|
|
def test_create_bridge_vlan_network(self): |
|
self._test_create_bridge_network(vlan_id=123) |
|
|
|
def test_create_bridge_vlan_network_outofrange_returns_400(self): |
|
with testlib_api.ExpectedException( |
|
webob.exc.HTTPClientError) as ctx_manager: |
|
self._test_create_bridge_network(vlan_id=5000) |
|
self.assertEqual(ctx_manager.exception.code, 400) |
|
|
|
def test_create_external_portgroup_network(self): |
|
name = 'ext_net' |
|
expected = [('subnets', []), ('name', name), ('admin_state_up', True), |
|
('status', 'ACTIVE'), ('shared', False), |
|
(external_net.EXTERNAL, True), |
|
(pnet.NETWORK_TYPE, 'portgroup'), |
|
(pnet.PHYSICAL_NETWORK, 'tzuuid')] |
|
providernet_args = {pnet.NETWORK_TYPE: 'portgroup', |
|
pnet.PHYSICAL_NETWORK: 'tzuuid', |
|
external_net.EXTERNAL: True} |
|
with self.network(name=name, |
|
providernet_args=providernet_args, |
|
arg_list=(pnet.NETWORK_TYPE, |
|
pnet.PHYSICAL_NETWORK, |
|
external_net.EXTERNAL)) as net: |
|
for k, v in expected: |
|
self.assertEqual(net['network'][k], v) |
|
|
|
def test_delete_network_after_removing_subnet(self): |
|
gateway_ip = '10.0.0.1' |
|
cidr = '10.0.0.0/24' |
|
fmt = 'json' |
|
# Create new network |
|
res = self._create_network(fmt=fmt, name='net', |
|
admin_state_up=True) |
|
network = self.deserialize(fmt, res) |
|
subnet = self._make_subnet(fmt, network, gateway_ip, |
|
cidr, ip_version=4) |
|
req = self.new_delete_request('subnets', subnet['subnet']['id']) |
|
sub_del_res = req.get_response(self.api) |
|
self.assertEqual(sub_del_res.status_int, 204) |
|
req = self.new_delete_request('networks', network['network']['id']) |
|
net_del_res = req.get_response(self.api) |
|
self.assertEqual(net_del_res.status_int, 204) |
|
|
|
def test_list_networks_with_shared(self): |
|
with self.network(name='net1'): |
|
with self.network(name='net2', shared=True): |
|
req = self.new_list_request('networks') |
|
res = self.deserialize('json', req.get_response(self.api)) |
|
self.assertEqual(len(res['networks']), 2) |
|
req_2 = self.new_list_request('networks') |
|
req_2.environ['neutron.context'] = context.Context('', |
|
'somebody') |
|
res = self.deserialize('json', req_2.get_response(self.api)) |
|
# tenant must see a single network |
|
self.assertEqual(len(res['networks']), 1) |
|
|
|
def test_create_network_name_exceeds_40_chars(self): |
|
name = 'this_is_a_network_whose_name_is_longer_than_40_chars' |
|
with self.network(name=name) as net: |
|
# Assert neutron name is not truncated |
|
self.assertEqual(net['network']['name'], name) |
|
|
|
def test_update_network_with_admin_false(self): |
|
data = {'network': {'admin_state_up': False}} |
|
with self.network() as net: |
|
plugin = manager.NeutronManager.get_plugin() |
|
self.assertRaises(NotImplementedError, |
|
plugin.update_network, |
|
context.get_admin_context(), |
|
net['network']['id'], data) |
|
|
|
def test_create_extend_dvs_provider_network(self): |
|
name = 'provider_net' |
|
expected = [('subnets', []), ('name', name), ('admin_state_up', True), |
|
('status', 'ACTIVE'), ('shared', False), |
|
(pnet.NETWORK_TYPE, 'flat'), |
|
(pnet.PHYSICAL_NETWORK, 'dvs-uuid')] |
|
providernet_args = {pnet.NETWORK_TYPE: 'flat', |
|
pnet.PHYSICAL_NETWORK: 'dvs-uuid'} |
|
with self.network(name=name, |
|
providernet_args=providernet_args, |
|
arg_list=(pnet.NETWORK_TYPE, |
|
pnet.PHYSICAL_NETWORK)) as net: |
|
for k, v in expected: |
|
self.assertEqual(net['network'][k], v) |
|
|
|
def test_create_same_vlan_network_with_different_dvs(self): |
|
name = 'dvs-provider-net' |
|
expected = [('subnets', []), ('name', name), ('admin_state_up', True), |
|
('status', 'ACTIVE'), ('shared', False), |
|
(pnet.NETWORK_TYPE, 'vlan'), |
|
(pnet.SEGMENTATION_ID, 43), |
|
(pnet.PHYSICAL_NETWORK, 'dvs-uuid-1')] |
|
providernet_args = {pnet.NETWORK_TYPE: 'vlan', |
|
pnet.SEGMENTATION_ID: 43, |
|
pnet.PHYSICAL_NETWORK: 'dvs-uuid-1'} |
|
with self.network(name=name, |
|
providernet_args=providernet_args, |
|
arg_list=(pnet.NETWORK_TYPE, |
|
pnet.SEGMENTATION_ID, |
|
pnet.PHYSICAL_NETWORK)) as net: |
|
for k, v in expected: |
|
self.assertEqual(net['network'][k], v) |
|
|
|
expected_same_vlan = [(pnet.NETWORK_TYPE, 'vlan'), |
|
(pnet.SEGMENTATION_ID, 43), |
|
(pnet.PHYSICAL_NETWORK, 'dvs-uuid-2')] |
|
providernet_args_1 = {pnet.NETWORK_TYPE: 'vlan', |
|
pnet.SEGMENTATION_ID: 43, |
|
pnet.PHYSICAL_NETWORK: 'dvs-uuid-2'} |
|
with self.network(name=name, |
|
providernet_args=providernet_args_1, |
|
arg_list=(pnet.NETWORK_TYPE, |
|
pnet.SEGMENTATION_ID, |
|
pnet.PHYSICAL_NETWORK)) as net1: |
|
for k, v in expected_same_vlan: |
|
self.assertEqual(net1['network'][k], v) |
|
|
|
|
|
class TestVnicIndex(NsxVPluginV2TestCase, |
|
test_vnic_index.VnicIndexDbTestCase): |
|
def test_update_port_twice_with_the_same_index(self): |
|
"""Tests that updates which does not modify the port vnic |
|
index association do not produce any errors |
|
""" |
|
with self.subnet() as subnet: |
|
with self.port(subnet=subnet) as port: |
|
res = self._port_index_update(port['port']['id'], 2) |
|
self.assertEqual(2, res['port'][ext_vnic_idx.VNIC_INDEX]) |
|
res = self._port_index_update(port['port']['id'], 2) |
|
self.assertEqual(2, res['port'][ext_vnic_idx.VNIC_INDEX]) |
|
|
|
|
|
class TestPortsV2(NsxVPluginV2TestCase, |
|
test_plugin.TestPortsV2, |
|
test_bindings.PortBindingsTestCase, |
|
test_bindings.PortBindingsHostTestCaseMixin, |
|
test_bindings.PortBindingsVnicTestCaseMixin): |
|
|
|
VIF_TYPE = nsx_constants.VIF_TYPE_DVS |
|
HAS_PORT_FILTER = True |
|
|
|
def test_create_port_json(self): |
|
keys = [('admin_state_up', True), ('status', self.port_create_status)] |
|
with self.port(name='myname') as port: |
|
for k, v in keys: |
|
self.assertEqual(port['port'][k], v) |
|
self.assertIn('mac_address', port['port']) |
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.3') |
|
self.assertEqual('myname', port['port']['name']) |
|
|
|
def test_list_ports(self): |
|
# for this test we need to enable overlapping ips |
|
cfg.CONF.set_default('allow_overlapping_ips', True) |
|
with self.subnet(enable_dhcp=False) as subnet,\ |
|
self.port(subnet) as port1,\ |
|
self.port(subnet) as port2,\ |
|
self.port(subnet) as port3: |
|
self._test_list_resources('port', [port1, port2, port3]) |
|
|
|
def test_list_ports_public_network(self): |
|
with self.network(shared=True) as network: |
|
with self.subnet(network, enable_dhcp=False) as subnet,\ |
|
self.port(subnet, tenant_id='tenant_1') as port1,\ |
|
self.port(subnet, tenant_id='tenant_2') as port2: |
|
|
|
# Admin request - must return both ports |
|
self._test_list_resources('port', [port1, port2]) |
|
# Tenant_1 request - must return single port |
|
q_context = context.Context('', 'tenant_1') |
|
self._test_list_resources('port', [port1], |
|
neutron_context=q_context) |
|
# Tenant_2 request - must return single port |
|
q_context = context.Context('', 'tenant_2') |
|
self._test_list_resources('port', [port2], |
|
neutron_context=q_context) |
|
|
|
def test_list_ports_with_pagination_emulated(self): |
|
helper_patcher = mock.patch( |
|
'neutron.api.v2.base.Controller._get_pagination_helper', |
|
new=test_plugin._fake_get_pagination_helper) |
|
helper_patcher.start() |
|
cfg.CONF.set_default('allow_overlapping_ips', True) |
|
with self.subnet(enable_dhcp=False) as subnet,\ |
|
self.port(subnet, mac_address='00:00:00:00:00:01') as port1,\ |
|
self.port(subnet, mac_address='00:00:00:00:00:02') as port2,\ |
|
self.port(subnet, mac_address='00:00:00:00:00:03') as port3: |
|
self._test_list_with_pagination('port', |
|
(port1, port2, port3), |
|
('mac_address', 'asc'), 2, 2) |
|
|
|
def test_list_ports_with_pagination_native(self): |
|
if self._skip_native_pagination: |
|
self.skipTest("Skip test for not implemented pagination feature") |
|
cfg.CONF.set_default('allow_overlapping_ips', True) |
|
with self.subnet(enable_dhcp=False) as subnet,\ |
|
self.port(subnet, mac_address='00:00:00:00:00:01') as port1,\ |
|
self.port(subnet, mac_address='00:00:00:00:00:02') as port2,\ |
|
self.port(subnet, mac_address='00:00:00:00:00:03') as port3: |
|
self._test_list_with_pagination('port', |
|
(port1, port2, port3), |
|
('mac_address', 'asc'), 2, 2) |
|
|
|
def test_list_ports_with_sort_emulated(self): |
|
helper_patcher = mock.patch( |
|
'neutron.api.v2.base.Controller._get_sorting_helper', |
|
new=test_plugin._fake_get_sorting_helper) |
|
helper_patcher.start() |
|
cfg.CONF.set_default('allow_overlapping_ips', True) |
|
with self.subnet(enable_dhcp=False) as subnet,\ |
|
self.port(subnet, admin_state_up='True', |
|
mac_address='00:00:00:00:00:01') as port1,\ |
|
self.port(subnet, admin_state_up='False', |
|
mac_address='00:00:00:00:00:02') as port2,\ |
|
self.port(subnet, admin_state_up='False', |
|
mac_address='00:00:00:00:00:03') as port3: |
|
|
|
self._test_list_with_sort('port', (port3, port2, port1), |
|
[('admin_state_up', 'asc'), |
|
('mac_address', 'desc')]) |
|
|
|
def test_list_ports_with_sort_native(self): |
|
if self._skip_native_sorting: |
|
self.skipTest("Skip test for not implemented sorting feature") |
|
cfg.CONF.set_default('allow_overlapping_ips', True) |
|
with self.subnet(enable_dhcp=False) as subnet,\ |
|
self.port(subnet, admin_state_up='True', |
|
mac_address='00:00:00:00:00:01') as port1,\ |
|
self.port(subnet, admin_state_up='False', |
|
mac_address='00:00:00:00:00:02') as port2,\ |
|
self.port(subnet, admin_state_up='False', |
|
mac_address='00:00:00:00:00:03') as port3: |
|
|
|
self._test_list_with_sort('port', (port3, port2, port1), |
|
[('admin_state_up', 'asc'), |
|
('mac_address', 'desc')]) |
|
|
|
def test_update_port_delete_ip(self): |
|
# This test case overrides the default because the nsx plugin |
|
# implements port_security/security groups and it is not allowed |
|
# to remove an ip address from a port unless the security group |
|
# is first removed. |
|
with self.subnet() as subnet: |
|
with self.port(subnet=subnet) as port: |
|
data = {'port': {'admin_state_up': False, |
|
'fixed_ips': [], |
|
secgrp.SECURITYGROUPS: []}} |
|
req = self.new_update_request('ports', |
|
data, port['port']['id']) |
|
res = self.deserialize('json', req.get_response(self.api)) |
|
self.assertEqual(res['port']['admin_state_up'], |
|
data['port']['admin_state_up']) |
|
self.assertEqual(res['port']['fixed_ips'], |
|
data['port']['fixed_ips']) |
|
|
|
def _update_port_index(self, port_id, device_id, index): |
|
data = {'port': {'device_id': device_id, 'vnic_index': index}} |
|
req = self.new_update_request('ports', |
|
data, port_id) |
|
res = self.deserialize('json', req.get_response(self.api)) |
|
return res |
|
|
|
@mock.patch.object(edge_utils, 'delete_dhcp_binding') |
|
def test_update_port_index(self, delete_dhcp_binding): |
|
q_context = context.Context('', 'tenant_1') |
|
device_id = _uuid() |
|
with self.subnet() as subnet: |
|
with self.port(subnet=subnet, |
|
device_id=device_id, |
|
device_owner='compute:None') as port: |
|
self.assertIsNone(port['port']['vnic_index']) |
|
|
|
vnic_index = 3 |
|
res = self._update_port_index( |
|
port['port']['id'], device_id, vnic_index) |
|
self.assertEqual(vnic_index, res['port']['vnic_index']) |
|
|
|
# Updating the vnic_index to None implies the vnic does |
|
# no longer obtain the addresses associated with this port, |
|
# we need to inactivate previous addresses configurations for |
|
# this vnic in the context of this network spoofguard policy. |
|
self.fc2.inactivate_vnic_assigned_addresses = ( |
|
mock.Mock().inactivate_vnic_assigned_addresses) |
|
|
|
policy_id = nsxv_db.get_spoofguard_policy_id( |
|
q_context.session, port['port']['network_id']) |
|
|
|
res = self._update_port_index(port['port']['id'], '', None) |
|
|
|
vnic_id = '%s.%03d' % (device_id, vnic_index) |
|
(self.fc2.inactivate_vnic_assigned_addresses. |
|
assert_called_once_with(policy_id, vnic_id)) |
|
self.assertTrue(delete_dhcp_binding.called) |
|
|
|
def test_update_port_with_compute_device_owner(self): |
|
""" |
|
Test that DHCP binding is created when ports 'device_owner' |
|
is updated to compute, for example when attaching an interface to a |
|
instance with existing port. |
|
""" |
|
with self.port() as port: |
|
with mock.patch(PLUGIN_NAME + '._create_dhcp_static_binding'): |
|
update = {'port': {'device_owner'}} |
|
self.new_update_request('ports', |
|
update, port['port']['id']) |
|
|
|
def test_create_port_public_network_with_ip(self): |
|
with self.network(shared=True) as network: |
|
with self.subnet(enable_dhcp=False, |
|
network=network, cidr='10.0.0.0/24') as subnet: |
|
keys = [('admin_state_up', True), |
|
('status', self.port_create_status), |
|
('fixed_ips', [{'subnet_id': subnet['subnet']['id'], |
|
'ip_address': '10.0.0.2'}])] |
|
port_res = self._create_port(self.fmt, |
|
network['network']['id'], |
|
webob.exc.HTTPCreated.code, |
|
tenant_id='another_tenant', |
|
set_context=True) |
|
port = self.deserialize(self.fmt, port_res) |
|
for k, v in keys: |
|
self.assertEqual(port['port'][k], v) |
|
self.assertIn('mac_address', port['port']) |
|
self._delete('ports', port['port']['id']) |
|
|
|
def test_no_more_port_exception(self): |
|
with self.subnet(enable_dhcp=False, cidr='10.0.0.0/31', |
|
gateway_ip=None) as subnet: |
|
id = subnet['subnet']['network_id'] |
|
res = self._create_port(self.fmt, id) |
|
data = self.deserialize(self.fmt, res) |
|
msg = str(n_exc.IpAddressGenerationFailure(net_id=id)) |
|
self.assertEqual(data['NeutronError']['message'], msg) |
|
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) |
|
|
|
def test_ports_vif_host(self): |
|
cfg.CONF.set_default('allow_overlapping_ips', True) |
|
host_arg = {portbindings.HOST_ID: self.hostname} |
|
with self.subnet(enable_dhcp=False) as subnet,\ |
|
self.port(subnet, name='name1', |
|
arg_list=(portbindings.HOST_ID,), **host_arg),\ |
|
self.port(subnet, name='name2'): |
|
ctx = context.get_admin_context() |
|
ports = self._list('ports', neutron_context=ctx)['ports'] |
|
self.assertEqual(2, len(ports)) |
|
for port in ports: |
|
if port['name'] == 'name1': |
|
self._check_response_portbindings_host(port) |
|
else: |
|
self.assertFalse(port[portbindings.HOST_ID]) |
|
# By default user is admin - now test non admin user |
|
ctx = context.Context(user_id=None, |
|
tenant_id=self._tenant_id, |
|
is_admin=False, |
|
read_deleted="no") |
|
ports = self._list('ports', neutron_context=ctx)['ports'] |
|
self.assertEqual(2, len(ports)) |
|
for non_admin_port in ports: |
|
self._check_response_no_portbindings_host(non_admin_port) |
|
|
|
def test_ports_vif_host_update(self): |
|
cfg.CONF.set_default('allow_overlapping_ips', True) |
|
host_arg = {portbindings.HOST_ID: self.hostname} |
|
with self.subnet(enable_dhcp=False) as subnet,\ |
|
self.port(subnet, name='name1', |
|
arg_list=(portbindings.HOST_ID,), |
|
**host_arg) as port1,\ |
|
self.port(subnet, name='name2') as port2: |
|
data = {'port': {portbindings.HOST_ID: 'testhosttemp'}} |
|
req = self.new_update_request( |
|
'ports', data, port1['port']['id']) |
|
req.get_response(self.api) |
|
req = self.new_update_request( |
|
'ports', data, port2['port']['id']) |
|
ctx = context.get_admin_context() |
|
req.get_response(self.api) |
|
ports = self._list('ports', neutron_context=ctx)['ports'] |
|
self.assertEqual(2, len(ports)) |
|
for port in ports: |
|
self.assertEqual('testhosttemp', port[portbindings.HOST_ID]) |
|
|
|
def test_ports_vif_details(self): |
|
plugin = manager.NeutronManager.get_plugin() |
|
cfg.CONF.set_default('allow_overlapping_ips', True) |
|
with self.subnet(enable_dhcp=False) as subnet,\ |
|
self.port(subnet), self.port(subnet): |
|
ctx = context.get_admin_context() |
|
ports = plugin.get_ports(ctx) |
|
self.assertEqual(len(ports), 2) |
|
for port in ports: |
|
self._check_response_portbindings(port) |
|
# By default user is admin - now test non admin user |
|
ctx = self._get_non_admin_context() |
|
ports = self._list('ports', neutron_context=ctx)['ports'] |
|
self.assertEqual(len(ports), 2) |
|
for non_admin_port in ports: |
|
self._check_response_no_portbindings(non_admin_port) |
|
|
|
def test_ports_vnic_type(self): |
|
cfg.CONF.set_default('allow_overlapping_ips', True) |
|
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type} |
|
with self.subnet(enable_dhcp=False) as subnet,\ |
|
self.port(subnet, name='name1', |
|
arg_list=(portbindings.VNIC_TYPE,), **vnic_arg),\ |
|
self.port(subnet, name='name2'): |
|
ctx = context.get_admin_context() |
|
ports = self._list('ports', neutron_context=ctx)['ports'] |
|
self.assertEqual(2, len(ports)) |
|
for port in ports: |
|
if port['name'] == 'name1': |
|
self._check_response_portbindings_vnic_type(port) |
|
else: |
|
self.assertEqual(portbindings.VNIC_NORMAL, |
|
port[portbindings.VNIC_TYPE]) |
|
# By default user is admin - now test non admin user |
|
ctx = context.Context(user_id=None, |
|
tenant_id=self._tenant_id, |
|
is_admin=False, |
|
read_deleted="no") |
|
ports = self._list('ports', neutron_context=ctx)['ports'] |
|
self.assertEqual(2, len(ports)) |
|
for non_admin_port in ports: |
|
self._check_response_portbindings_vnic_type(non_admin_port) |
|
|
|
def test_ports_vnic_type_list(self): |
|
cfg.CONF.set_default('allow_overlapping_ips', True) |
|
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type} |
|
with self.subnet(enable_dhcp=False) as subnet,\ |
|
self.port(subnet, name='name1', |
|
arg_list=(portbindings.VNIC_TYPE,), |
|
**vnic_arg) as port1,\ |
|
self.port(subnet, name='name2') as port2,\ |
|
self.port(subnet, name='name3', |
|
arg_list=(portbindings.VNIC_TYPE,), |
|
**vnic_arg) as port3: |
|
|
|
self._test_list_resources('port', (port1, port2, port3), |
|
query_params='%s=%s' % ( |
|
portbindings.VNIC_TYPE, |
|
self.vnic_type)) |
|
|
|
def test_range_allocation(self): |
|
with self.subnet(enable_dhcp=False, gateway_ip='10.0.0.3', |
|
cidr='10.0.0.0/29') as subnet: |
|
kwargs = {"fixed_ips": |
|
[{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': subnet['subnet']['id']}]} |
|
net_id = subnet['subnet']['network_id'] |
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs) |
|
port = self.deserialize(self.fmt, res) |
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 5) |
|
alloc = ['10.0.0.1', '10.0.0.2', '10.0.0.4', '10.0.0.5', |
|
'10.0.0.6'] |
|
for ip in ips: |
|
self.assertIn(ip['ip_address'], alloc) |
|
self.assertEqual(ip['subnet_id'], |
|
subnet['subnet']['id']) |
|
alloc.remove(ip['ip_address']) |
|
self.assertEqual(len(alloc), 0) |
|
self._delete('ports', port['port']['id']) |
|
|
|
with self.subnet(enable_dhcp=False, gateway_ip='11.0.0.6', |
|
cidr='11.0.0.0/29') as subnet: |
|
kwargs = {"fixed_ips": |
|
[{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': subnet['subnet']['id']}]} |
|
net_id = subnet['subnet']['network_id'] |
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs) |
|
port = self.deserialize(self.fmt, res) |
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 5) |
|
alloc = ['11.0.0.1', '11.0.0.2', '11.0.0.3', '11.0.0.4', |
|
'11.0.0.5'] |
|
for ip in ips: |
|
self.assertIn(ip['ip_address'], alloc) |
|
self.assertEqual(ip['subnet_id'], |
|
subnet['subnet']['id']) |
|
alloc.remove(ip['ip_address']) |
|
self.assertEqual(len(alloc), 0) |
|
self._delete('ports', port['port']['id']) |
|
|
|
def test_requested_duplicate_ip(self): |
|
with self.subnet(enable_dhcp=False) as subnet: |
|
with self.port(subnet=subnet) as port: |
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.2') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
# Check configuring of duplicate IP |
|
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'], |
|
'ip_address': ips[0]['ip_address']}]} |
|
net_id = port['port']['network_id'] |
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs) |
|
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code) |
|
|
|
def test_requested_invalid_fixed_ips(self): |
|
with self.subnet(enable_dhcp=False) as subnet: |
|
with self.port(subnet=subnet) as port: |
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.2') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
# Test invalid subnet_id |
|
kwargs = {"fixed_ips": |
|
[{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': |
|
'00000000-ffff-ffff-ffff-000000000000'}]} |
|
net_id = port['port']['network_id'] |
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs) |
|
port2 = self.deserialize(self.fmt, res) |
|
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code) |
|
|
|
# Test invalid IP address on specified subnet_id |
|
kwargs = {"fixed_ips": |
|
[{'subnet_id': subnet['subnet']['id'], |
|
'ip_address': '1.1.1.1'}]} |
|
net_id = port['port']['network_id'] |
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs) |
|
port2 = self.deserialize(self.fmt, res) |
|
self.assertEqual(res.status_int, |
|
webob.exc.HTTPClientError.code) |
|
|
|
# Test invalid addresses - IP's not on subnet or network |
|
# address or broadcast address |
|
bad_ips = ['1.1.1.1', '10.0.0.0', '10.0.0.255'] |
|
net_id = port['port']['network_id'] |
|
for ip in bad_ips: |
|
kwargs = {"fixed_ips": [{'ip_address': ip}]} |
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs) |
|
port2 = self.deserialize(self.fmt, res) |
|
self.assertEqual(res.status_int, |
|
webob.exc.HTTPClientError.code) |
|
|
|
# Enable allocation of gateway address |
|
kwargs = {"fixed_ips": |
|
[{'subnet_id': subnet['subnet']['id'], |
|
'ip_address': '10.0.0.1'}]} |
|
net_id = port['port']['network_id'] |
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs) |
|
port2 = self.deserialize(self.fmt, res) |
|
ips = port2['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.1') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
self._delete('ports', port2['port']['id']) |
|
|
|
def test_requested_split(self): |
|
with self.subnet(enable_dhcp=False) as subnet: |
|
with self.port(subnet=subnet) as port: |
|
ports_to_delete = [] |
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.2') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
# Allocate specific IP |
|
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'], |
|
'ip_address': '10.0.0.5'}]} |
|
net_id = port['port']['network_id'] |
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs) |
|
port2 = self.deserialize(self.fmt, res) |
|
ports_to_delete.append(port2) |
|
ips = port2['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.5') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
# Allocate specific IP's |
|
allocated = ['10.0.0.3', '10.0.0.4', '10.0.0.6'] |
|
|
|
for a in allocated: |
|
res = self._create_port(self.fmt, net_id=net_id) |
|
port2 = self.deserialize(self.fmt, res) |
|
ports_to_delete.append(port2) |
|
ips = port2['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], a) |
|
self.assertEqual(ips[0]['subnet_id'], |
|
subnet['subnet']['id']) |
|
|
|
for p in ports_to_delete: |
|
self._delete('ports', p['port']['id']) |
|
|
|
def test_requested_ips_only(self): |
|
with self.subnet(enable_dhcp=False) as subnet: |
|
with self.port(subnet=subnet) as port: |
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.2') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
ips_only = ['10.0.0.18', '10.0.0.20', '10.0.0.22', '10.0.0.21', |
|
'10.0.0.3', '10.0.0.17', '10.0.0.19'] |
|
ports_to_delete = [] |
|
for i in ips_only: |
|
kwargs = {"fixed_ips": [{'ip_address': i}]} |
|
net_id = port['port']['network_id'] |
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs) |
|
port = self.deserialize(self.fmt, res) |
|
ports_to_delete.append(port) |
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], i) |
|
self.assertEqual(ips[0]['subnet_id'], |
|
subnet['subnet']['id']) |
|
for p in ports_to_delete: |
|
self._delete('ports', p['port']['id']) |
|
|
|
def test_requested_subnet_id(self): |
|
with self.subnet(enable_dhcp=False) as subnet: |
|
with self.port(subnet=subnet) as port: |
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.2') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
# Request a IP from specific subnet |
|
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']}]} |
|
net_id = port['port']['network_id'] |
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs) |
|
port2 = self.deserialize(self.fmt, res) |
|
ips = port2['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.3') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
self._delete('ports', port2['port']['id']) |
|
|
|
def test_requested_subnet_id_v4_and_v6(self): |
|
with self.subnet(enable_dhcp=False) as subnet: |
|
# Get a IPv4 and IPv6 address |
|
tenant_id = subnet['subnet']['tenant_id'] |
|
net_id = subnet['subnet']['network_id'] |
|
res = self._create_subnet( |
|
self.fmt, |
|
tenant_id=tenant_id, |
|
net_id=net_id, |
|
cidr='2607:f0d0:1002:51::/124', |
|
ip_version=6, |
|
gateway_ip=attributes.ATTR_NOT_SPECIFIED, |
|
enable_dhcp=False) |
|
subnet2 = self.deserialize(self.fmt, res) |
|
kwargs = {"fixed_ips": |
|
[{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': subnet2['subnet']['id']}]} |
|
res = self._create_port(self.fmt, net_id=net_id, **kwargs) |
|
port3 = self.deserialize(self.fmt, res) |
|
ips = port3['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 2) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.2') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
self.assertEqual(ips[1]['ip_address'], '2607:f0d0:1002:51::2') |
|
self.assertEqual(ips[1]['subnet_id'], subnet2['subnet']['id']) |
|
res = self._create_port(self.fmt, net_id=net_id) |
|
port4 = self.deserialize(self.fmt, res) |
|
# Check that a v4 and a v6 address are allocated |
|
ips = port4['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 2) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.3') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
self.assertEqual(ips[1]['ip_address'], '2607:f0d0:1002:51::3') |
|
self.assertEqual(ips[1]['subnet_id'], subnet2['subnet']['id']) |
|
self._delete('ports', port3['port']['id']) |
|
self._delete('ports', port4['port']['id']) |
|
|
|
def test_update_port_add_additional_ip(self): |
|
"""Test update of port with additional IP.""" |
|
with self.subnet(enable_dhcp=False) as subnet: |
|
with self.port(subnet=subnet) as port: |
|
data = {'port': {'admin_state_up': False, |
|
'fixed_ips': [{'subnet_id': |
|
subnet['subnet']['id']}, |
|
{'subnet_id': |
|
subnet['subnet']['id']}]}} |
|
req = self.new_update_request('ports', data, |
|
port['port']['id']) |
|
res = self.deserialize(self.fmt, req.get_response(self.api)) |
|
self.assertEqual(res['port']['admin_state_up'], |
|
data['port']['admin_state_up']) |
|
ips = res['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 2) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.3') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
self.assertEqual(ips[1]['ip_address'], '10.0.0.4') |
|
self.assertEqual(ips[1]['subnet_id'], subnet['subnet']['id']) |
|
|
|
def test_update_port_update_ip(self): |
|
"""Test update of port IP. |
|
|
|
Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10. |
|
""" |
|
with self.subnet(enable_dhcp=False) as subnet: |
|
with self.port(subnet=subnet) as port: |
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.2') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
data = {'port': {'fixed_ips': [{'subnet_id': |
|
subnet['subnet']['id'], |
|
'ip_address': "10.0.0.10"}]}} |
|
req = self.new_update_request('ports', data, |
|
port['port']['id']) |
|
res = self.deserialize(self.fmt, req.get_response(self.api)) |
|
ips = res['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.10') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
|
|
def test_update_port_update_ip_address_only(self): |
|
with self.subnet(enable_dhcp=False) as subnet: |
|
with self.port(subnet=subnet) as port: |
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 1) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.2') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
data = {'port': {'fixed_ips': [{'subnet_id': |
|
subnet['subnet']['id'], |
|
'ip_address': "10.0.0.10"}, |
|
{'ip_address': "10.0.0.2"}]}} |
|
req = self.new_update_request('ports', data, |
|
port['port']['id']) |
|
res = self.deserialize(self.fmt, req.get_response(self.api)) |
|
ips = res['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 2) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.2') |
|
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) |
|
self.assertEqual(ips[1]['ip_address'], '10.0.0.10') |
|
self.assertEqual(ips[1]['subnet_id'], subnet['subnet']['id']) |
|
|
|
def test_requested_subnet_id_v4_and_v6_slaac(self): |
|
with self.network() as network,\ |
|
self.subnet(network, enable_dhcp=False) as subnet,\ |
|
self.subnet(network, |
|
cidr='2607:f0d0:1002:51::/64', |
|
ip_version=6, |
|
ipv6_address_mode=constants.IPV6_SLAAC, |
|
gateway_ip='fe80::1') as subnet2: |
|
with self.port(subnet, |
|
fixed_ips=[{'subnet_id': subnet['subnet']['id']}, |
|
{'subnet_id': subnet2['subnet']['id']}] |
|
) as port: |
|
|
|
ips = port['port']['fixed_ips'] |
|
self.assertEqual(len(ips), 2) |
|
self.assertEqual(ips[0]['ip_address'], '10.0.0.2') |
|
port_mac = port['port']['mac_address'] |
|
subnet_cidr = subnet2['subnet']['cidr'] |
|
eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64( |
|
subnet_cidr, port_mac)) |
|
self.assertEqual(ips[1]['ip_address'], eui_addr) |
|
|
|
def _test_create_port_with_ipv6_subnet_in_fixed_ips(self, addr_mode, |
|
ipv6_pd=False): |
|
"""Test port create with an IPv6 subnet incl in fixed IPs.""" |
|
with self.network(name='net') as network: |
|
subnet = self._make_v6_subnet(network, addr_mode, ipv6_pd) |
|
subnet_id = subnet['subnet']['id'] |
|
fixed_ips = [{'subnet_id': subnet_id}] |
|
with self.port(subnet=subnet, fixed_ips=fixed_ips) as port: |
|
if addr_mode == constants.IPV6_SLAAC: |
|
exp_ip_addr = self._calc_ipv6_addr_by_EUI64(port, subnet) |
|
else: |
|
exp_ip_addr = 'fe80::3' |
|
port_fixed_ips = port['port']['fixed_ips'] |
|
self.assertEqual(1, len(port_fixed_ips)) |
|
self.assertEqual(exp_ip_addr, |
|
port_fixed_ips[0]['ip_address']) |
|
|
|
def test_create_port_with_ipv6_slaac_subnet_in_fixed_ips(self): |
|
self._test_create_port_with_ipv6_subnet_in_fixed_ips( |
|
addr_mode=constants.IPV6_SLAAC) |
|
|
|
def test_create_port_with_ipv6_dhcp_stateful_subnet_in_fixed_ips(self): |
|
with testlib_api.ExpectedException( |
|
webob.exc.HTTPClientError) as ctx_manager: |
|
self._test_create_port_with_ipv6_subnet_in_fixed_ips( |
|
addr_mode=constants.DHCPV6_STATEFUL) |
|
self.assertEqual(ctx_manager.exception.code, 400) |
|
|
|
def test_create_router_port_ipv4_and_ipv6_slaac_no_fixed_ips(self): |
|
# Create an IPv4 and an IPv6 SLAAC subnet on the network |
|
with self.network() as network,\ |
|
self.subnet(network),\ |
|
self.subnet(network, |
|
cidr='2607:f0d0:1002:51::/64', |
|
ip_version=6, |
|
gateway_ip='fe80::1', |
|
ipv6_address_mode=constants.IPV6_SLAAC): |
|
# Create a router port without specifying fixed_ips |
|
port = self._make_port( |
|
self.fmt, network['network']['id'], |
|
device_owner=constants.DEVICE_OWNER_ROUTER_INTF) |
|
# Router port should only have an IPv4 address |
|
fixed_ips = port['port']['fixed_ips'] |
|
self.assertEqual(1, len(fixed_ips)) |
|
self.assertEqual('10.0.0.3', fixed_ips[0]['ip_address']) |
|
|
|
def test_create_port_with_multiple_ipv4_and_ipv6_subnets(self): |
|
# This test should fail as the NSX-v plugin should cause Neutron to |
|
# return a 400 status code |
|
with testlib_api.ExpectedException( |
|
webob.exc.HTTPClientError) as ctx_manager: |
|
super(TestPortsV2, self).\ |
|
test_create_port_with_multiple_ipv4_and_ipv6_subnets() |
|
self.assertEqual(ctx_manager.exception.code, 400) |
|
|
|
|
|
class TestSubnetsV2(NsxVPluginV2TestCase, |
|
test_plugin.TestSubnetsV2): |
|
def setUp(self, |
|
plugin=PLUGIN_NAME, |
|
ext_mgr=None, |
|
service_plugins=None): |
|
super(TestSubnetsV2, self).setUp() |
|
self.context = context.get_admin_context() |
|
|
|
def _create_subnet_bulk(self, fmt, number, net_id, name, |
|
ip_version=4, **kwargs): |
|
base_data = {'subnet': {'network_id': net_id, |
|
'ip_version': ip_version, |
|
'enable_dhcp': False, |
|
'tenant_id': self._tenant_id}} |
|
# auto-generate cidrs as they should not overlap |
|
overrides = dict((k, v) |
|
for (k, v) in zip(range(number), |
|
[{'cidr': "10.0.%s.0/24" % num} |
|
for num in range(number)])) |
|
kwargs.update({'override': overrides}) |
|
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs) |
|
|
|
def test_create_subnet_with_two_host_routes(self): |
|
self.skipTest("Skip test for not implemented host_routes feature") |
|
|
|
def test_delete_subnet_with_route(self): |
|
self.skipTest("Skip test for not implemented host_routes feature") |
|
|
|
def test_update_subnet_adding_additional_host_routes_and_dns(self): |
|
self.skipTest("Skip test for not implemented host_routes feature") |
|
|
|
def test_delete_subnet_with_dns_and_route(self): |
|
self.skipTest("Skip test for not implemented host_routes feature") |
|
|
|
def test_update_subnet_route(self): |
|
self.skipTest("Skip test for not implemented host_routes feature") |
|
|
|
def test_update_subnet_route_to_None(self): |
|
self.skipTest("Skip test for not implemented host_routes feature") |
|
|
|
def test_create_subnet_with_one_host_route(self): |
|
self.skipTest("Skip test for not implemented host_routes feature") |
|
|
|
def test_create_subnet_nonzero_cidr(self): |
|
awkward_cidrs = [{'nonezero': '10.129.122.5/8', |
|
'corrected': '10.0.0.0/8'}, |
|
{'nonezero': '11.129.122.5/15', |
|
'corrected': '11.128.0.0/15'}, |
|
{'nonezero': '12.129.122.5/16', |
|
'corrected': '12.129.0.0/16'}, |
|
{'nonezero': '13.129.122.5/18', |
|
'corrected': '13.129.64.0/18'}, |
|
{'nonezero': '14.129.122.5/22', |
|
'corrected': '14.129.120.0/22'}, |
|
{'nonezero': '15.129.122.5/24', |
|
'corrected': '15.129.122.0/24'}, |
|
{'nonezero': '16.129.122.5/28', |
|
'corrected': '16.129.122.0/28'}, ] |
|
|
|
for cidr in awkward_cidrs: |
|
with self.subnet(enable_dhcp=False, |
|
cidr=cidr['nonezero']) as subnet: |
|
# the API should accept and correct these cidrs for users |
|
self.assertEqual(cidr['corrected'], |
|
subnet['subnet']['cidr']) |
|
|
|
with self.subnet(enable_dhcp=False, cidr='17.129.122.5/32', |
|
gateway_ip=None) as subnet: |
|
self.assertEqual('17.129.122.5/32', subnet['subnet']['cidr']) |
|
|
|
def test_create_subnet_ipv6_attributes(self): |
|
# Expected to fail for now as we dont't support IPv6 for NSXv |
|
cidr = "fe80::/80" |
|
with testlib_api.ExpectedException( |
|
webob.exc.HTTPClientError) as ctx_manager: |
|
self._test_create_subnet(cidr=cidr) |
|
self.assertEqual(ctx_manager.exception.code, 400) |
|
|
|
def test_create_subnet_with_different_dhcp_server(self): |
|
self.mock_create_dhcp_service.stop() |
|
name = 'dvs-provider-net' |
|
providernet_args = {pnet.NETWORK_TYPE: 'vlan', |
|
pnet.SEGMENTATION_ID: 43, |
|
pnet.PHYSICAL_NETWORK: 'dvs-uuid'} |
|
with self.network(name=name, do_delete=False, |
|
providernet_args=providernet_args, |
|
arg_list=(pnet.NETWORK_TYPE, |
|
pnet.SEGMENTATION_ID, |
|
pnet.PHYSICAL_NETWORK)) as net: |
|
self._test_create_subnet(network=net, cidr='10.0.0.0/24') |
|
dhcp_router_id = (vcns_const.DHCP_EDGE_PREFIX + |
|
net['network']['id'])[:36] |
|
dhcp_server_id = nsxv_db.get_nsxv_router_binding( |
|
self.context.session, dhcp_router_id)['edge_id'] |
|
providernet_args_1 = {pnet.NETWORK_TYPE: 'vlan', |
|
pnet.SEGMENTATION_ID: 43, |
|
pnet.PHYSICAL_NETWORK: 'dvs-uuid-1'} |
|
with self.network(name=name, do_delete=False, |
|
providernet_args=providernet_args_1, |
|
arg_list=(pnet.NETWORK_TYPE, |
|
pnet.SEGMENTATION_ID, |
|
pnet.PHYSICAL_NETWORK)) as net1: |
|
self._test_create_subnet(network=net1, cidr='10.0.1.0/24') |
|
router_id = (vcns_const.DHCP_EDGE_PREFIX + |
|
net1['network']['id'])[:36] |
|
dhcp_server_id_1 = nsxv_db.get_nsxv_router_binding( |
|
self.context.session, router_id)['edge_id'] |
|
self.assertNotEqual(dhcp_server_id, dhcp_server_id_1) |
|
|
|
def test_create_subnet_with_different_dhcp_by_flat_net(self): |
|
self.mock_create_dhcp_service.stop() |
|
name = 'flat-net' |
|
providernet_args = {pnet.NETWORK_TYPE: 'flat', |
|
pnet.PHYSICAL_NETWORK: 'dvs-uuid'} |
|
with self.network(name=name, do_delete=False, |
|
providernet_args=providernet_args, |
|
arg_list=(pnet.NETWORK_TYPE, |
|
pnet.PHYSICAL_NETWORK)) as net: |
|
self._test_create_subnet(network=net, cidr='10.0.0.0/24') |
|
dhcp_router_id = (vcns_const.DHCP_EDGE_PREFIX + |
|
net['network']['id'])[:36] |
|
dhcp_server_id = nsxv_db.get_nsxv_router_binding( |
|
self.context.session, dhcp_router_id)['edge_id'] |
|
providernet_args_1 = {pnet.NETWORK_TYPE: 'flat', |
|
pnet.PHYSICAL_NETWORK: 'dvs-uuid'} |
|
with self.network(name=name, do_delete=False, |
|
providernet_args=providernet_args_1, |
|
arg_list=(pnet.NETWORK_TYPE, |
|
pnet.PHYSICAL_NETWORK)) as net1: |
|
self._test_create_subnet(network=net1, cidr='10.0.1.0/24') |
|
router_id = (vcns_const.DHCP_EDGE_PREFIX + |
|
net1['network']['id'])[:36] |
|
dhcp_server_id_1 = nsxv_db.get_nsxv_router_binding( |
|
self.context.session, router_id)['edge_id'] |
|
self.assertNotEqual(dhcp_server_id, dhcp_server_id_1) |
|
|
|
def test_create_subnet_ipv6_slaac_with_db_reference_error(self): |
|
self.skipTest('Currently not support') |
|
|
|
def test_create_subnet_ipv6_gw_values(self): |
|
# This test should fail with response code 400 as IPv6 subnets with |
|
# DHCP are not supported by this plugin |
|
with testlib_api.ExpectedException( |
|
webob.exc.HTTPClientError) as ctx_manager: |
|
super(TestSubnetsV2, self).test_create_subnet_ipv6_gw_values() |
|
self.assertEqual(ctx_manager.exception.code, 400) |
|
|
|
|
|
class TestBasicGet(test_plugin.TestBasicGet, NsxVPluginV2TestCase): |
|
pass |
|
|
|
|
|
class TestV2HTTPResponse(test_plugin.TestV2HTTPResponse, NsxVPluginV2TestCase): |
|
pass |
|
|
|
|
|
class TestL3ExtensionManager(object): |
|
|
|
def get_resources(self): |
|
# Simulate extension of L3 attribute map |
|
# First apply attribute extensions |
|
for key in l3.RESOURCE_ATTRIBUTE_MAP.keys(): |
|
l3.RESOURCE_ATTRIBUTE_MAP[key].update( |
|
l3_ext_gw_mode.EXTENDED_ATTRIBUTES_2_0.get(key, {})) |
|
l3.RESOURCE_ATTRIBUTE_MAP[key].update( |
|
dist_router.EXTENDED_ATTRIBUTES_2_0.get(key, {})) |
|
l3.RESOURCE_ATTRIBUTE_MAP[key].update( |
|
router_type.EXTENDED_ATTRIBUTES_2_0.get(key, {})) |
|
# Finally add l3 resources to the global attribute map |
|
attributes.RESOURCE_ATTRIBUTE_MAP.update( |
|
l3.RESOURCE_ATTRIBUTE_MAP) |
|
return l3.L3.get_resources() |
|
|
|
def get_actions(self): |
|
return [] |
|
|
|
def get_request_extensions(self): |
|
return [] |
|
|
|
|
|
def backup_l3_attribute_map(): |
|
"""Return a backup of the original l3 attribute map.""" |
|
return dict((res, attrs.copy()) for |
|
(res, attrs) in six.iteritems(l3.RESOURCE_ATTRIBUTE_MAP)) |
|
|
|
|
|
def restore_l3_attribute_map(map_to_restore): |
|
"""Ensure changes made by fake ext mgrs are reverted.""" |
|
l3.RESOURCE_ATTRIBUTE_MAP = map_to_restore |
|
|
|
|
|
class L3NatTest(test_l3_plugin.L3BaseForIntTests, NsxVPluginV2TestCase): |
|
|
|
def _restore_l3_attribute_map(self): |
|
l3.RESOURCE_ATTRIBUTE_MAP = self._l3_attribute_map_bk |
|
|
|
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None, service_plugins=None): |
|
self._l3_attribute_map_bk = {} |
|
for item in l3.RESOURCE_ATTRIBUTE_MAP: |
|
self._l3_attribute_map_bk[item] = ( |
|
l3.RESOURCE_ATTRIBUTE_MAP[item].copy()) |
|
cfg.CONF.set_override('task_status_check_interval', 200, group="nsxv") |
|
|
|
cfg.CONF.set_override('api_extensions_path', vmware.NSXEXT_PATH) |
|
l3_attribute_map_bk = backup_l3_attribute_map() |
|
self.addCleanup(restore_l3_attribute_map, l3_attribute_map_bk) |
|
ext_mgr = ext_mgr or TestL3ExtensionManager() |
|
super(L3NatTest, self).setUp( |
|
plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins) |
|
self.plugin_instance = manager.NeutronManager.get_plugin() |
|
self._plugin_name = "%s.%s" % ( |
|
self.plugin_instance.__module__, |
|
self.plugin_instance.__class__.__name__) |
|
self._plugin_class = self.plugin_instance.__class__ |
|
|
|
def tearDown(self): |
|
plugin = manager.NeutronManager.get_plugin() |
|
_manager = plugin.nsx_v.task_manager |
|
# wait max ~10 seconds for all tasks to be finished |
|
for i in range(100): |
|
if not _manager.has_pending_task(): |
|
break |
|
greenthread.sleep(0.1) |
|
if _manager.has_pending_task(): |
|
_manager.show_pending_tasks() |
|
raise Exception(_("Tasks not completed")) |
|
_manager.stop() |
|
# Ensure the manager thread has been stopped |
|
self.assertIsNone(_manager._thread) |
|
super(L3NatTest, self).tearDown() |
|
|
|
def _create_l3_ext_network(self, vlan_id=None): |
|
name = 'l3_ext_net' |
|
return self.network(name=name, |
|
router__external=True) |
|
|
|
def _create_router(self, fmt, tenant_id, name=None, |
|
admin_state_up=None, set_context=False, |
|
arg_list=None, **kwargs): |
|
data = {'router': {'tenant_id': tenant_id}} |
|
if name: |
|
data['router']['name'] = name |
|
if admin_state_up: |
|
data['router']['admin_state_up'] = admin_state_up |
|
for arg in (('admin_state_up', 'tenant_id') + (arg_list or ())): |
|
# Arg must be present and not empty |
|
if kwargs.get(arg): |
|
data['router'][arg] = kwargs[arg] |
|
router_req = self.new_create_request('routers', data, fmt) |
|
if set_context and tenant_id: |
|
# create a specific auth context for this request |
|
router_req.environ['neutron.context'] = context.Context( |
|
'', tenant_id) |
|
|
|
return router_req.get_response(self.ext_api) |
|
|
|
def _make_router(self, fmt, tenant_id, name=None, admin_state_up=None, |
|
external_gateway_info=None, set_context=False, |
|
arg_list=None, **kwargs): |
|
if external_gateway_info: |
|
arg_list = ('external_gateway_info', ) + (arg_list or ()) |
|
res = self._create_router(fmt, tenant_id, name, |
|
admin_state_up, set_context, |
|
arg_list=arg_list, |
|
external_gateway_info=external_gateway_info, |
|
**kwargs) |
|
return self.deserialize(fmt, res) |
|
|
|
@contextlib.contextmanager |
|
def router(self, name=None, admin_state_up=True, |
|
fmt=None, tenant_id=_uuid(), |
|
external_gateway_info=None, set_context=False, |
|
**kwargs): |
|
# avoid name duplication of edge |
|
if not name: |
|
name = _uuid() |
|
router = self._make_router(fmt or self.fmt, tenant_id, name, |
|
admin_state_up, external_gateway_info, |
|
set_context, **kwargs) |
|
yield router |
|
|
|
def _recursive_sort_list(self, lst): |
|
sorted_list = [] |
|
for ele in lst: |
|
if isinstance(ele, list): |
|
sorted_list.append(self._recursive_sort_list(ele)) |
|
elif isinstance(ele, dict): |
|
sorted_list.append(self._recursive_sort_dict(ele)) |
|
else: |
|
sorted_list.append(ele) |
|
return sorted(sorted_list) |
|
|
|
def _recursive_sort_dict(self, dct): |
|
sorted_dict = {} |
|
for k, v in dct.items(): |
|
if isinstance(v, list): |
|
sorted_dict[k] = self._recursive_sort_list(v) |
|
elif isinstance(v, dict): |
|
sorted_dict[k] = self._recursive_sort_dict(v) |
|
else: |
|
sorted_dict[k] = v |
|
return sorted_dict |
|
|
|
def _update_router_enable_snat(self, router_id, network_id, enable_snat): |
|
return self._update('routers', router_id, |
|
{'router': {'external_gateway_info': |
|
{'network_id': network_id, |
|
'enable_snat': enable_snat}}}) |
|
|
|
|
|
class L3NatTestCaseBase(test_l3_plugin.L3NatTestCaseMixin): |
|
|
|
def test_floatingip_multi_external_one_internal(self): |
|
with self.subnet(cidr="10.0.0.0/24", |
|
enable_dhcp=False) as ext1,\ |
|
self.subnet(cidr="11.0.0.0/24", |
|
enable_dhcp=False) as ext2,\ |
|
self.subnet(cidr="12.0.0.0/24", |
|
enable_dhcp=False) as inter1: |
|
|
|
network_ex_id1 = ext1['subnet']['network_id'] |
|
network_ex_id2 = ext2['subnet']['network_id'] |
|
self._set_net_external(network_ex_id1) |
|
self._set_net_external(network_ex_id2) |
|
r2i_fixed_ips = [{'ip_address': '12.0.0.2'}] |
|
|
|
with self.router(no_delete=True) as r1,\ |
|
self.router(no_delete=True) as r2,\ |
|
self.port(subnet=inter1, fixed_ips=r2i_fixed_ips) as r2i: |
|
|
|
self._add_external_gateway_to_router( |
|
r1['router']['id'], |
|
network_ex_id1) |
|
self._router_interface_action('add', r1['router']['id'], |
|
inter1['subnet']['id'], |
|
None) |
|
self._add_external_gateway_to_router( |
|
r2['router']['id'], |
|
network_ex_id2) |
|
self._router_interface_action('add', r2['router']['id'], |
|
None, |
|
r2i['port']['id']) |
|
|
|
with self.port(subnet=inter1, |
|
fixed_ips=[{'ip_address': '12.0.0.3'}] |
|
) as private_port: |
|
|
|
fp1 = self._make_floatingip(self.fmt, network_ex_id1, |
|
private_port['port']['id'], |
|
floating_ip='10.0.0.3') |
|
fp2 = self._make_floatingip(self.fmt, network_ex_id2, |
|
private_port['port']['id'], |
|
floating_ip='11.0.0.3') |
|
self.assertEqual(fp1['floatingip']['router_id'], |
|
r1['router']['id']) |
|
self.assertEqual(fp2['floatingip']['router_id'], |
|
r2['router']['id']) |
|
|
|
def test_create_floatingip_with_multisubnet_id(self): |
|
with self.network() as network: |
|
self._set_net_external(network['network']['id']) |
|
with self.subnet(network, |
|
enable_dhcp=False, |
|
cidr='10.0.12.0/24') as subnet1: |
|
with self.subnet(network, |
|
enable_dhcp=False, |
|
cidr='10.0.13.0/24') as subnet2: |
|
with self.router(): |
|
res = self._create_floatingip( |
|
self.fmt, |
|
subnet1['subnet']['network_id'], |
|
subnet_id=subnet1['subnet']['id']) |
|
fip1 = self.deserialize(self.fmt, res) |
|
res = self._create_floatingip( |
|
self.fmt, |
|
subnet1['subnet']['network_id'], |
|
subnet_id=subnet2['subnet']['id']) |
|
fip2 = self.deserialize(self.fmt, res) |
|
self.assertTrue( |
|
fip1['floatingip']['floating_ip_address'].startswith('10.0.12')) |
|
self.assertTrue( |
|
fip2['floatingip']['floating_ip_address'].startswith('10.0.13')) |
|
|
|
def test_create_floatingip_with_wrong_subnet_id(self): |
|
with self.network() as network1: |
|
self._set_net_external(network1['network']['id']) |
|
with self.subnet(network1, |
|
enable_dhcp=False, |
|
cidr='10.0.12.0/24') as subnet1: |
|
with self.network() as network2: |
|
self._set_net_external(network2['network']['id']) |
|
with self.subnet(network2, |
|
enable_dhcp=False, |
|
cidr='10.0.13.0/24') as subnet2: |
|
with self.router(): |
|
res = self._create_floatingip( |
|
self.fmt, |
|
subnet1['subnet']['network_id'], |
|
subnet_id=subnet2['subnet']['id']) |
|
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) |
|
|
|
def test_router_update_gateway_upon_subnet_create_ipv6(self): |
|
with self.network() as n: |
|
with self.subnet(network=n, enable_dhcp=False) as s1,\ |
|
self.router() as r: |
|
self._set_net_external(n['network']['id']) |
|
res1 = self._add_external_gateway_to_router( |
|
r['router']['id'], |
|
n['network']['id'], |
|
ext_ips=[{'subnet_id': s1['subnet']['id']}]) |
|
fip1 = (res1['router']['external_gateway_info'] |
|
['external_fixed_ips'][0]) |
|
sres = self._create_subnet(self.fmt, net_id=n['network']['id'], |
|
ip_version=6, cidr='2001:db8::/32', |
|
enable_dhcp=False, |
|
expected_res_status=( |
|
webob.exc.HTTPCreated.code)) |
|
s2 = self.deserialize(self.fmt, sres) |
|
res2 = self._show('routers', r['router']['id']) |
|
self.assertEqual(fip1, res2['router']['external_gateway_info'] |
|
['external_fixed_ips'][0]) |
|
fip2 = (res2['router']['external_gateway_info'] |
|
['external_fixed_ips'][1]) |
|
self.assertEqual(s2['subnet']['id'], fip2['subnet_id']) |
|
self.assertNotEqual(fip1['subnet_id'], fip2['subnet_id']) |
|
self.assertNotEqual(fip1['ip_address'], fip2['ip_address']) |
|
|
|
@mock.patch.object(edge_utils, "update_firewall") |
|
def test_router_set_gateway_with_nosnat(self, mock): |
|
expected_fw = [{'action': 'allow', |
|
'enabled': True, |
|
'source_ip_address': [], |
|
'destination_ip_address': []}] |
|
nosnat_fw = [{'action': 'allow', |
|
'enabled': True, |
|
'source_vnic_groups': ["external"], |
|
'destination_ip_address': []}] |
|
|
|
with self.router() as r1,\ |
|
self.subnet() as ext_subnet,\ |
|
self.subnet(cidr='11.0.0.0/24') as s1,\ |
|
self.subnet(cidr='12.0.0.0/24') as s2: |
|
self._set_net_external(ext_subnet['subnet']['network_id']) |
|
|
|
self._router_interface_action( |
|
'add', r1['router']['id'], |
|
s1['subnet']['id'], None) |
|
expected_fw[0]['source_ip_address'] = ['11.0.0.0/24'] |
|
expected_fw[0]['destination_ip_address'] = ['11.0.0.0/24'] |
|
fw_rules = mock.call_args[0][3]['firewall_rule_list'] |
|
self.assertEqual(self._recursive_sort_list(expected_fw), |
|
self._recursive_sort_list(fw_rules)) |
|
self._add_external_gateway_to_router( |
|
r1['router']['id'], |
|
ext_subnet['subnet']['network_id']) |
|
fw_rules = mock.call_args[0][3]['firewall_rule_list'] |
|
self.assertEqual(self._recursive_sort_list(expected_fw), |
|
self._recursive_sort_list(fw_rules)) |
|
self._update_router_enable_snat( |
|
r1['router']['id'], |
|
ext_subnet['subnet']['network_id'], |
|
False) |
|
nosnat_fw[0]['destination_ip_address'] = ['11.0.0.0/24'] |
|
fw_rules = mock.call_args[0][3]['firewall_rule_list'] |
|
self.assertEqual( |
|
self._recursive_sort_list(expected_fw + nosnat_fw), |
|
self._recursive_sort_list(fw_rules)) |
|
self._router_interface_action('add', |
|
r1['router']['id'], |
|
s2['subnet']['id'], |
|
None) |
|
expected_fw[0]['source_ip_address'] = ['12.0.0.0/24', |
|
'11.0.0.0/24'] |
|
expected_fw[0]['destination_ip_address'] = ['12.0.0.0/24', |
|
'11.0.0.0/24'] |
|
nosnat_fw[0]['destination_ip_address'] = ['11.0.0.0/24', |
|
'12.0.0.0/24'] |
|
fw_rules = mock.call_args[0][3]['firewall_rule_list'] |
|
self.assertEqual( |
|
self._recursive_sort_list(expected_fw + nosnat_fw), |
|
self._recursive_sort_list(fw_rules)) |
|
self._router_interface_action('remove', |
|
r1['router']['id'], |
|
s1['subnet']['id'], |
|
None) |
|
expected_fw[0]['source_ip_address'] = ['12.0.0.0/24'] |
|
expected_fw[0]['destination_ip_address'] = ['12.0.0.0/24'] |
|
nosnat_fw[0]['destination_ip_address'] = ['12.0.0.0/24'] |
|
fw_rules = mock.call_args[0][3]['firewall_rule_list'] |
|
self.assertEqual( |
|
self._recursive_sort_list(expected_fw + nosnat_fw), |
|
self._recursive_sort_list(fw_rules)) |
|
self._update_router_enable_snat( |
|
r1['router']['id'], |
|
ext_subnet['subnet']['network_id'], |
|
True) |
|
fw_rules = mock.call_args[0][3]['firewall_rule_list'] |
|
self.assertEqual( |
|
self._recursive_sort_list(expected_fw), |
|
self._recursive_sort_list(fw_rules)) |
|
self._router_interface_action('remove', |
|
r1['router']['id'], |
|
s2['subnet']['id'], |
|
None) |
|
self._remove_external_gateway_from_router( |
|
r1['router']['id'], |
|
ext_subnet['subnet']['network_id']) |
|
|
|
def test_router_add_interface_port_bad_tenant_returns_404(self): |
|
self.skipTest('TBD') |
|
|
|
def test_router_add_interface_subnet_with_bad_tenant_returns_404(self): |
|
self.skipTest('TBD') |
|
|
|
|
|
class IPv6ExpectedFailuresTestMixin(object): |
|
|
|
def test_router_add_interface_ipv6_subnet(self): |
|
# Expect a 400 statuc code as IPv6 subnets w/DHCP are not supported |
|
with testlib_api.ExpectedException( |
|
webob.exc.HTTPClientError) as ctx_manager: |
|
super(IPv6ExpectedFailuresTestMixin, self).\ |
|
test_router_add_interface_ipv6_subnet() |
|
self.assertEqual(ctx_manager.exception.code, 400) |
|
|
|
def test_router_add_iface_ipv6_ext_ra_subnet_returns_400(self): |
|
# This returns a 400 too, but as an exception is raised the response |
|
# code need to be asserted differently |
|
with testlib_api.ExpectedException( |
|
webob.exc.HTTPClientError) as ctx_manager: |
|
super(IPv6ExpectedFailuresTestMixin, self).\ |
|
test_router_add_iface_ipv6_ext_ra_subnet_returns_400() |
|
self.assertEqual(ctx_manager.exception.code, 400) |
|
|
|
def test_router_add_gateway_multiple_subnets_ipv6(self): |
|
# Expect a 400 statuc code as IPv6 subnets w/DHCP are not supported |
|
with testlib_api.ExpectedException( |
|
webob.exc.HTTPClientError) as ctx_manager: |
|
super(IPv6ExpectedFailuresTestMixin, self).\ |
|
test_router_add_gateway_multiple_subnets_ipv6() |
|
self.assertEqual(ctx_manager.exception.code, 400) |
|
|
|
|
|
class TestExclusiveRouterTestCase(L3NatTest, L3NatTestCaseBase, |
|
test_l3_plugin.L3NatDBIntTestCase, |
|
IPv6ExpectedFailuresTestMixin, |
|
NsxVPluginV2TestCase): |
|
|
|
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None, service_plugins=None): |
|
super(TestExclusiveRouterTestCase, self).setUp( |
|
plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins) |
|
self.plugin_instance.nsx_v.is_subnet_in_use = mock.Mock() |
|
self.plugin_instance.nsx_v.is_subnet_in_use.return_value = False |
|
|
|
def _create_router(self, fmt, tenant_id, name=None, |
|
admin_state_up=None, set_context=False, |
|
arg_list=None, **kwargs): |
|
data = {'router': {'tenant_id': tenant_id}} |
|
if name: |
|
data['router']['name'] = name |
|
if admin_state_up: |
|
data['router']['admin_state_up'] = admin_state_up |
|
for arg in (('admin_state_up', 'tenant_id') + (arg_list or ())): |
|
# Arg must be present and not empty |
|
if arg in kwargs and kwargs[arg]: |
|
data['router'][arg] = kwargs[arg] |
|
|
|
data['router']['router_type'] = kwargs.get('router_type', 'exclusive') |
|
|
|
router_req = self.new_create_request('routers', data, fmt) |
|
if set_context and tenant_id: |
|
# create a specific auth context for this request |
|
router_req.environ['neutron.context'] = context.Context( |
|
'', tenant_id) |
|
|
|
return router_req.get_response(self.ext_api) |
|
|
|
def _test_create_l3_ext_network(self, vlan_id=0): |
|
name = 'l3_ext_net' |
|
expected = [('subnets', []), ('name', name), ('admin_state_up', True), |
|
('status', 'ACTIVE'), ('shared', False), |
|
(external_net.EXTERNAL, True)] |
|
with self._create_l3_ext_network(vlan_id) as net: |
|
for k, v in expected: |
|
self.assertEqual(net['network'][k], v) |
|
|
|
def test_create_router_fail_at_the_backend(self): |
|
p = manager.NeutronManager.get_plugin() |
|
edge_manager = p.edge_manager |
|
with mock.patch.object(edge_manager, 'create_lrouter', |
|
side_effect=[n_exc.NeutronException]): |
|
router = {'router': {'admin_state_up': True, |
|
'name': 'e161be1d-0d0d-4046-9823-5a593d94f72c', |
|
'router_type': 'exclusive'}} |
|
self.assertRaises(n_exc.NeutronException, |
|
p.create_router, |
|
context.get_admin_context(), |
|
router) |
|
self._test_list_resources('router', ()) |
|
|
|
def test_create_l3_ext_network_with_dhcp(self): |
|
with self._create_l3_ext_network() as net: |
|
with testlib_api.ExpectedException( |
|
webob.exc.HTTPClientError) as ctx_manager: |
|
with self.subnet(network=net): |
|
self.assertEqual(ctx_manager.exception.code, 400) |
|
|
|
def test_create_l3_ext_network_without_vlan(self): |
|
self._test_create_l3_ext_network() |
|
|
|
def _test_router_create_with_gwinfo_and_l3_ext_net(self, vlan_id=None, |
|
validate_ext_gw=False): |
|
with self._create_l3_ext_network(vlan_id) as net: |
|
with self.subnet(network=net, enable_dhcp=False) as s: |
|
data = {'router': {'tenant_id': 'whatever'}} |
|
data['router']['name'] = 'router1' |
|
data['router']['external_gateway_info'] = { |
|
'network_id': s['subnet']['network_id']} |
|
router_req = self.new_create_request('routers', data, |
|
self.fmt) |
|
res = router_req.get_response(self.ext_api) |
|
router = self.deserialize(self.fmt, res) |
|
self.assertEqual( |
|
s['subnet']['network_id'], |
|
(router['router']['external_gateway_info'] |
|
['network_id'])) |
|
if validate_ext_gw: |
|
pass |
|
|
|
def test_router_create_with_gwinfo_and_l3_ext_net(self): |
|
self._test_router_create_with_gwinfo_and_l3_ext_net() |
|
|
|
def test_router_create_with_gwinfo_and_l3_ext_net_with_vlan(self): |
|
self._test_router_create_with_gwinfo_and_l3_ext_net(444) |
|
|
|
def test_router_add_gateway_invalid_network_returns_404(self): |
|
# NOTE(salv-orlando): This unit test has been overriden |
|
# as the nsx plugin support the ext_gw_mode extension |
|
# which mandates a uuid for the external network identifier |
|
with self.router() as r: |
|
self._add_external_gateway_to_router( |
|
r['router']['id'], |
|
uuidutils.generate_uuid(), |
|
expected_code=webob.exc.HTTPNotFound.code) |
|
|
|
def _test_router_update_gateway_on_l3_ext_net(self, vlan_id=None, |
|
validate_ext_gw=False, |
|
distributed=False): |
|
with self.router( |
|
arg_list=('distributed',), distributed=distributed) as r: |
|
with self.subnet() as s1: |
|
with self._create_l3_ext_network(vlan_id) as net: |
|
with self.subnet(network=net, enable_dhcp=False) as s2: |
|
self._set_net_external(s1['subnet']['network_id']) |
|
try: |
|
self._add_external_gateway_to_router( |
|
r['router']['id'], |
|
s1['subnet']['network_id']) |
|
body = self._show('routers', r['router']['id']) |
|
net_id = (body['router'] |
|
['external_gateway_info']['network_id']) |
|
self.assertEqual(net_id, |
|
s1['subnet']['network_id']) |
|
# Plug network with external mapping |
|
self._set_net_external(s2['subnet']['network_id']) |
|
self._add_external_gateway_to_router( |
|
r['router']['id'], |
|
s2['subnet']['network_id']) |
|
body = self._show('routers', r['router']['id']) |
|
net_id = (body['router'] |
|
['external_gateway_info']['network_id']) |
|
self.assertEqual(net_id, |
|
s2['subnet']['network_id']) |
|
if validate_ext_gw: |
|
pass |
|
finally: |
|
# Cleanup |
|
self._remove_external_gateway_from_router( |
|
r['router']['id'], |
|
s2['subnet']['network_id']) |
|
|
|
def test_router_update_gateway_on_l3_ext_net(self): |
|
self._test_router_update_gateway_on_l3_ext_net() |
|
|
|
def test_router_update_gateway_on_l3_ext_net_with_vlan(self): |
|
self._test_router_update_gateway_on_l3_ext_net(444) |
|
|
|
def test_router_update_gateway_with_existing_floatingip(self): |
|
with self._create_l3_ext_network() as net: |
|
with self.subnet(network=net, enable_dhcp=False) as subnet: |
|
with self.floatingip_with_assoc() as fip: |
|
self._add_external_gateway_to_router( |
|
fip['floatingip']['router_id'], |
|
subnet['subnet']['network_id'], |
|
expected_code=webob.exc.HTTPConflict.code) |
|
|
|
def test_router_list_by_tenant_id(self): |
|
with self.router(), self.router(): |
|
with self.router(tenant_id='custom') as router: |
|
self._test_list_resources('router', [router], |
|
query_params="tenant_id=custom") |
|
|
|
def test_create_l3_ext_network_with_vlan(self): |
|
self._test_create_l3_ext_network(666) |
|
|
|
def test_floatingip_with_assoc_fails(self): |
|
self._test_floatingip_with_assoc_fails( |
|
self._plugin_name + '._check_and_get_fip_assoc') |
|
|
|
def test_floatingip_with_invalid_create_port(self): |
|
self._test_floatingip_with_invalid_create_port(self._plugin_name) |
|
|
|
def test_floatingip_update(self): |
|
super(TestExclusiveRouterTestCase, self).test_floatingip_update( |
|
constants.FLOATINGIP_STATUS_DOWN) |
|
|
|
def test_floatingip_disassociate(self): |
|
with self.port() as p: |
|
private_sub = {'subnet': {'id': |
|
p['port']['fixed_ips'][0]['subnet_id']}} |
|
with self.floatingip_no_assoc(private_sub) as fip: |
|
self.assertEqual(fip['floatingip']['status'], |
|
constants.FLOATINGIP_STATUS_DOWN) |
|
port_id = p['port']['id'] |
|
body = self._update('floatingips', fip['floatingip']['id'], |
|
{'floatingip': {'port_id': port_id}}) |
|
self.assertEqual(body['floatingip']['port_id'], port_id) |
|
self.assertEqual(body['floatingip']['status'], |
|
constants.FLOATINGIP_STATUS_ACTIVE) |
|
# Disassociate |
|
body = self._update('floatingips', fip['floatingip']['id'], |
|
{'floatingip': {'port_id': None}}) |
|
body = self._show('floatingips', fip['floatingip']['id']) |
|
self.assertIsNone(body['floatingip']['port_id']) |
|
self.assertIsNone(body['floatingip']['fixed_ip_address']) |
|
self.assertEqual(body['floatingip']['status'], |
|
constants.FLOATINGIP_STATUS_DOWN) |
|
|
|
def test_update_floatingip_with_edge_router_update_failure(self): |
|
p = manager.NeutronManager.get_plugin() |
|
with self.subnet() as subnet,\ |
|
self.port(subnet=subnet) as p1,\ |
|
self.port(subnet=subnet) as p2: |
|
p1_id = p1['port']['id'] |
|
p2_id = p2['port']['id'] |
|
with self.floatingip_with_assoc(port_id=p1_id) as fip: |
|
with self._mock_edge_router_update_with_exception(): |
|
self.assertRaises(object, |
|
p.update_floatingip, |
|
context.get_admin_context(), |
|
fip['floatingip']['id'], |
|
floatingip={'floatingip': |
|
{'port_id': p2_id}}) |
|
res = self._list( |
|
'floatingips', query_params="port_id=%s" % p1_id) |
|
self.assertEqual(len(res['floatingips']), 1) |
|
res = self._list( |
|
'floatingips', query_params="port_id=%s" % p2_id) |
|
self.assertEqual(len(res['floatingips']), 0) |
|
|
|
def test_create_floatingip_with_edge_router_update_failure(self): |
|
p = manager.NeutronManager.get_plugin() |
|
with self.subnet(cidr='200.0.0.0/24') as public_sub: |
|
public_network_id = public_sub['subnet']['network_id'] |
|
self._set_net_external(public_network_id) |
|
with self.port() as private_port: |
|
port_id = private_port['port']['id'] |
|
tenant_id = private_port['port']['tenant_id'] |
|
subnet_id = private_port['port']['fixed_ips'][0]['subnet_id'] |
|
with self.router() as r: |
|
self._add_external_gateway_to_router( |
|
r['router']['id'], |
|
public_sub['subnet']['network_id']) |
|
self._router_interface_action('add', |
|
r['router']['id'], |
|
subnet_id, |
|
None) |
|
floatingip = {'floatingip': { |
|
'tenant_id': tenant_id, |
|
'floating_network_id': public_network_id, |
|
'port_id': port_id}} |
|
|
|
with self._mock_edge_router_update_with_exception(): |
|
self.assertRaises(object, |
|
p.create_floatingip, |
|
context.get_admin_context(), |
|
floatingip=floatingip) |
|
res = self._list( |
|
'floatingips', query_params="port_id=%s" % port_id) |
|
self.assertEqual(len(res['floatingips']), 0) |
|
# Cleanup |
|
self._router_interface_action('remove', |
|
r['router']['id'], |
|
subnet_id, |
|
None) |
|
self._remove_external_gateway_from_router( |
|
r['router']['id'], public_network_id) |
|
|
|
@contextlib.contextmanager |
|
def _mock_edge_router_update_with_exception(self): |
|
nsx_router_update = PLUGIN_NAME + '._update_edge_router' |
|
with mock.patch(nsx_router_update) as update_edge: |
|
update_edge.side_effect = object() |
|
yield update_edge |
|
|
|
@mock.patch.object(edge_utils, "update_firewall") |
|
def test_router_interfaces_with_update_firewall(self, mock): |
|
s1_cidr = '10.0.0.0/24' |
|
s2_cidr = '11.0.0.0/24' |
|
with self.router() as r,\ |
|
self.subnet(cidr=s1_cidr) as s1,\ |
|
self.subnet(cidr=s2_cidr) as s2: |
|
|
|
self._router_interface_action('add', |
|
r['router']['id'], |
|
s1['subnet']['id'], |
|
None) |
|
self._router_interface_action('add', |
|
r['router']['id'], |
|
s2['subnet']['id'], |
|
None) |
|
expected_cidrs = [s1_cidr, s2_cidr] |
|
expected_fw = [{'action': 'allow', |
|
'enabled': True, |
|
'source_ip_address': expected_cidrs, |
|
'destination_ip_address': expected_cidrs}] |
|
fw_rules = mock.call_args[0][3]['firewall_rule_list'] |
|
self.assertEqual(self._recursive_sort_list(expected_fw), |
|
self._recursive_sort_list(fw_rules)) |
|
self._router_interface_action('remove', |
|
r['router']['id'], |
|
s1['subnet']['id'], |
|
None) |
|
self._router_interface_action('remove', |
|
r['router']['id'], |
|
s2['subnet']['id'], |
|
None) |
|
|
|
@mock.patch.object(edge_utils, "update_firewall") |
|
def test_router_interfaces_different_tenants_update_firewall(self, mock): |
|
tenant_id = _uuid() |
|
other_tenant_id = _uuid() |
|
s1_cidr = '10.0.0.0/24' |
|
s2_cidr = '11.0.0.0/24' |
|
with self.router(tenant_id=tenant_id) as r,\ |
|
self.network(tenant_id=tenant_id) as n1,\ |
|
self.network(tenant_id=other_tenant_id) as n2,\ |
|
self.subnet(network=n1, cidr=s1_cidr) as s1,\ |
|
self.subnet(network=n2, cidr=s2_cidr) as s2: |
|
|
|
self._router_interface_action('add', |
|
r['router']['id'], |
|
s2['subnet']['id'], |
|
None) |
|
self._router_interface_action('add', |
|
r['router']['id'], |
|
s1['subnet']['id'], |
|
None, |
|
tenant_id=tenant_id) |
|
expected_cidrs = [s1_cidr, s2_cidr] |
|
expected_fw = [{'action': 'allow', |
|
'enabled': True, |
|
'source_ip_address': expected_cidrs, |
|
'destination_ip_address': expected_cidrs}] |
|
fw_rules = mock.call_args[0][3]['firewall_rule_list'] |
|
self.assertEqual(self._recursive_sort_list(expected_fw), |
|
self._recursive_sort_list(fw_rules)) |
|
self._router_interface_action('remove', |
|
r['router']['id'], |
|
s1['subnet']['id'], |
|
None, |
|
tenant_id=tenant_id) |
|
self._router_interface_action('remove', |
|
r['router']['id'], |
|
s2['subnet']['id'], |
|
None) |
|
expected_fw = [] |
|
fw_rules = mock.call_args[0][3]['firewall_rule_list'] |
|
self.assertEqual(expected_fw, fw_rules) |
|
|
|
def test_delete_ext_net_with_disassociated_floating_ips(self): |
|