# Copyright (c) 2015 OpenStack Foundation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. from unittest import mock import netaddr from neutron.db import l3_db from neutron.db import models_v2 from neutron.db import securitygroups_db as sg_db from neutron.extensions import address_scope from neutron.extensions import l3 from neutron.extensions import securitygroup as secgrp from neutron.tests.unit import _test_extension_portbindings as test_bindings from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin from neutron.tests.unit.extensions import test_address_scope from neutron.tests.unit.extensions import test_extra_dhcp_opt as test_dhcpopts from neutron.tests.unit.extensions import test_extraroute as test_ext_route from neutron.tests.unit.extensions import test_l3 as test_l3_plugin from neutron.tests.unit.extensions \ import test_l3_ext_gw_mode as test_ext_gw_mode from neutron.tests.unit.scheduler \ import test_dhcp_agent_scheduler as test_dhcpagent from neutron.tests.unit import testlib_api from neutron_lib.api.definitions import external_net as extnet_apidef from neutron_lib.api.definitions import extraroute as xroute_apidef from neutron_lib.api.definitions import l3_ext_gw_mode as l3_egm_apidef from neutron_lib.api.definitions import port_security as psec from neutron_lib.api.definitions import portbindings from neutron_lib.api.definitions import provider_net as pnet from neutron_lib.api.definitions import vlantransparent as vlan_apidef from neutron_lib.callbacks import events from neutron_lib.callbacks import exceptions as nc_exc from neutron_lib.callbacks import registry from neutron_lib.callbacks import resources from neutron_lib import constants from neutron_lib import context from neutron_lib import exceptions as n_exc from neutron_lib.plugins import directory from neutron_lib.plugins import utils as plugin_utils from oslo_config import cfg from oslo_db import exception as db_exc from oslo_utils import uuidutils from webob import exc from vmware_nsx.api_client import exception as api_exc from vmware_nsx.common import utils from vmware_nsx.db import db as nsx_db from vmware_nsx.plugins.nsx_v3 import plugin as nsx_plugin from vmware_nsx.services.lbaas.nsx_v3.implementation import loadbalancer_mgr from vmware_nsx.services.lbaas.octavia import octavia_listener from vmware_nsx.tests import unit as vmware from vmware_nsx.tests.unit.common_plugin import common_v3 from vmware_nsx.tests.unit.extensions import test_metadata from vmware_nsxlib.tests.unit.v3 import mocks as nsx_v3_mocks from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase from vmware_nsxlib.v3 import exceptions as nsxlib_exc from vmware_nsxlib.v3 import nsx_constants PLUGIN_NAME = 'vmware_nsx.plugin.NsxV3Plugin' NSX_TZ_NAME = 'default transport zone' NSX_DHCP_PROFILE_ID = 'default dhcp profile' NSX_METADATA_PROXY_ID = 'default metadata proxy' NSX_SWITCH_PROFILE = 'dummy switch profile' NSX_DHCP_RELAY_SRV = 'dhcp relay srv' NSX_EDGE_CLUSTER_UUID = 'dummy edge cluster' def _mock_create_firewall_rules(*args): # NOTE(arosen): the code in the neutron plugin expects the # neutron rule id as the display_name. rules = args[4] return { 'rules': [ {'display_name': rule['id'], 'id': uuidutils.generate_uuid()} for rule in rules ]} def _return_id_key(*args, **kwargs): return {'id': uuidutils.generate_uuid()} def _return_id_key_list(*args, **kwargs): return [{'id': uuidutils.generate_uuid()}] def _mock_add_rules_in_section(*args): # NOTE(arosen): the code in the neutron plugin expects the # neutron rule id as the display_name. rules = args[0] return { 'rules': [ {'display_name': rule['display_name'], 'id': uuidutils.generate_uuid()} for rule in rules ]} def _mock_nsx_backend_calls(): mock.patch("vmware_nsxlib.v3.client.NSX3Client").start() fake_profile = {'key': 'FakeKey', 'resource_type': 'FakeResource', 'id': uuidutils.generate_uuid()} def _return_id(*args, **kwargs): return uuidutils.generate_uuid() def _return_same(key, *args, **kwargs): return key mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibSwitchingProfile." "find_by_display_name", return_value=[fake_profile] ).start() mock.patch( "vmware_nsxlib.v3.router.RouterLib.validate_tier0").start() mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibSwitchingProfile." "create_port_mirror_profile", side_effect=_return_id_key).start() mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibBridgeEndpoint.create", side_effect=_return_id_key).start() mock.patch( "vmware_nsxlib.v3.security.NsxLibNsGroup.find_by_display_name", side_effect=_return_id_key_list).start() mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.create", side_effect=_return_id_key).start() mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibDhcpProfile." "get_id_by_name_or_id", return_value=NSX_DHCP_PROFILE_ID).start() mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibDhcpRelayService." "get_id_by_name_or_id", return_value=NSX_DHCP_RELAY_SRV).start() mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibMetadataProxy." "get_id_by_name_or_id", side_effect=_return_same).start() mock.patch( "vmware_nsxlib.v3.resources.LogicalPort.create", side_effect=_return_id_key).start() mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibLogicalRouter.create", side_effect=_return_id_key).start() mock.patch( "vmware_nsxlib.v3.resources.LogicalDhcpServer.create", side_effect=_return_id_key).start() mock.patch( "vmware_nsxlib.v3.resources.LogicalDhcpServer.create_binding", side_effect=_return_id_key).start() mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibLogicalRouter." "get_firewall_section_id", side_effect=_return_id_key).start() mock.patch( "vmware_nsxlib.v3.NsxLib.get_version", return_value='3.1.0').start() mock.patch( "vmware_nsxlib.v3.load_balancer.Service.get_router_lb_service", return_value=None).start() mock.patch('vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='OVERLAY').start() mock.patch("vmware_nsxlib.v3.core_resources.NsxLibEdgeCluster." "get_transport_nodes", return_value=['dummy']).start() mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportNode." "get_transport_zones", return_value=[NSX_TZ_NAME, mock.ANY]).start() mock.patch("vmware_nsxlib.v3.security.NsxLibFirewallSection.add_rules", side_effect=_mock_add_rules_in_section).start() class NsxV3PluginTestCaseMixin(test_plugin.NeutronDbPluginV2TestCase, nsxlib_testcase.NsxClientTestCase): def setup_conf_overrides(self): cfg.CONF.set_override('default_overlay_tz', NSX_TZ_NAME, 'nsx_v3') cfg.CONF.set_override('native_dhcp_metadata', False, 'nsx_v3') cfg.CONF.set_override('dhcp_profile', NSX_DHCP_PROFILE_ID, 'nsx_v3') cfg.CONF.set_override('metadata_proxy', NSX_METADATA_PROXY_ID, 'nsx_v3') cfg.CONF.set_override( 'network_scheduler_driver', 'neutron.scheduler.dhcp_agent_scheduler.AZAwareWeightScheduler') def mock_plugin_methods(self): # need to mock the global placeholder. This is due to the fact that # the generic security group tests assume that there is just one # security group. mock_ensure_global_sg_placeholder = mock.patch.object( nsx_plugin.NsxV3Plugin, '_ensure_global_sg_placeholder') mock_ensure_global_sg_placeholder.start() mock.patch( 'neutron_lib.rpc.Connection.consume_in_threads', return_value=[]).start() mock.patch.object(nsx_plugin.NsxV3Plugin, '_cleanup_duplicates').start() def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None, service_plugins=None, **kwargs): self._patchers = [] _mock_nsx_backend_calls() self.setup_conf_overrides() self.mock_get_edge_cluster = mock.patch.object( nsx_plugin.NsxV3Plugin, '_get_edge_cluster', return_value=NSX_EDGE_CLUSTER_UUID) self.mock_get_edge_cluster.start() self.mock_plugin_methods() # ignoring the given plugin and use the nsx-v3 one if not plugin.endswith('NsxTVDPlugin'): plugin = PLUGIN_NAME super(NsxV3PluginTestCaseMixin, self).setUp(plugin=plugin, ext_mgr=ext_mgr) self.maxDiff = None def tearDown(self): for patcher in self._patchers: patcher.stop() super(NsxV3PluginTestCaseMixin, self).tearDown() def _create_network(self, fmt, name, admin_state_up, arg_list=None, providernet_args=None, set_context=False, tenant_id=None, **kwargs): tenant_id = tenant_id or self._tenant_id data = {'network': {'name': name, 'admin_state_up': admin_state_up, 'tenant_id': tenant_id}} # Fix to allow the router:external attribute and any other # attributes containing a colon to be passed with # a double underscore instead kwargs = dict((k.replace('__', ':'), v) for k, v in kwargs.items()) if extnet_apidef.EXTERNAL in kwargs: arg_list = (extnet_apidef.EXTERNAL, ) + (arg_list or ()) if providernet_args: kwargs.update(providernet_args) for arg in (('admin_state_up', 'tenant_id', 'shared', 'availability_zone_hints') + (arg_list or ())): # Arg must be present if arg in kwargs: data['network'][arg] = kwargs[arg] network_req = self.new_create_request('networks', data, fmt) if set_context and tenant_id: # create a specific auth context for this request network_req.environ['neutron.context'] = context.Context( '', tenant_id) return network_req.get_response(self.api) def _create_l3_ext_network( self, physical_network=nsx_v3_mocks.DEFAULT_TIER0_ROUTER_UUID): name = 'l3_ext_net' net_type = utils.NetworkTypes.L3_EXT providernet_args = {pnet.NETWORK_TYPE: net_type, pnet.PHYSICAL_NETWORK: physical_network} return self.network(name=name, router__external=True, providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK)) def _save_networks(self, networks): ctx = context.get_admin_context() for network_id in networks: with ctx.session.begin(subtransactions=True): ctx.session.add(models_v2.Network(id=network_id)) def _initialize_azs(self): self.plugin.init_availability_zones() self.plugin._translate_configured_names_to_uuids() def _enable_native_dhcp_md(self): cfg.CONF.set_override('native_dhcp_metadata', True, 'nsx_v3') cfg.CONF.set_override('dhcp_agent_notification', False) self.plugin._init_dhcp_metadata() def _enable_dhcp_relay(self): # Add the relay service to the config and availability zones cfg.CONF.set_override('dhcp_relay_service', NSX_DHCP_RELAY_SRV, 'nsx_v3') mock_nsx_version = mock.patch.object( self.plugin.nsxlib, 'feature_supported', return_value=True) mock_nsx_version.start() self._initialize_azs() self._enable_native_dhcp_md() class TestNetworksV2(test_plugin.TestNetworksV2, NsxV3PluginTestCaseMixin): def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None, service_plugins=None): # add vlan transparent to the configuration cfg.CONF.set_override('vlan_transparent', True) super(TestNetworksV2, self).setUp(plugin=plugin, ext_mgr=ext_mgr) def tearDown(self): super(TestNetworksV2, self).tearDown() @mock.patch.object(nsx_plugin.NsxV3Plugin, 'validate_availability_zones') def test_create_network_with_availability_zone(self, mock_validate_az): name = 'net-with-zone' zone = ['zone1'] mock_validate_az.return_value = None with self.network(name=name, availability_zone_hints=zone) as net: az_hints = net['network']['availability_zone_hints'] self.assertListEqual(az_hints, zone) def test_network_failure_rollback(self): self._enable_native_dhcp_md() self.plugin = directory.get_plugin() with mock.patch.object(self.plugin.nsxlib.logical_port, 'create', side_effect=api_exc.NsxApiException): self.network() ctx = context.get_admin_context() networks = self.plugin.get_networks(ctx) self.assertListEqual([], networks) def test_create_provider_flat_network(self): providernet_args = {pnet.NETWORK_TYPE: 'flat'} with mock.patch('vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.' 'create', side_effect=_return_id_key) as nsx_create, \ mock.patch('vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.' 'delete') as nsx_delete, \ mock.patch('vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='VLAN'),\ self.network(name='flat_net', providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, )) as net: self.assertEqual('flat', net['network'].get(pnet.NETWORK_TYPE)) # make sure the network is created at the backend nsx_create.assert_called_once() # Delete the network and make sure it is deleted from the backend req = self.new_delete_request('networks', net['network']['id']) res = req.get_response(self.api) self.assertEqual(exc.HTTPNoContent.code, res.status_int) nsx_delete.assert_called_once() def test_create_provider_flat_network_with_physical_net(self): physical_network = nsx_v3_mocks.DEFAULT_TIER0_ROUTER_UUID providernet_args = {pnet.NETWORK_TYPE: 'flat', pnet.PHYSICAL_NETWORK: physical_network} with mock.patch( 'vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='VLAN'),\ self.network(name='flat_net', providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK)) as net: self.assertEqual('flat', net['network'].get(pnet.NETWORK_TYPE)) def test_create_provider_flat_network_with_vlan(self): providernet_args = {pnet.NETWORK_TYPE: 'flat', pnet.SEGMENTATION_ID: 11} with mock.patch('vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='VLAN'): result = self._create_network(fmt='json', name='bad_flat_net', admin_state_up=True, providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, pnet.SEGMENTATION_ID)) data = self.deserialize('json', result) # should fail self.assertEqual('InvalidInput', data['NeutronError']['type']) def test_create_provider_geneve_network(self): providernet_args = {pnet.NETWORK_TYPE: 'geneve'} with mock.patch('vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.' 'create', side_effect=_return_id_key) as nsx_create, \ mock.patch('vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.' 'delete') as nsx_delete, \ mock.patch('vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='OVERLAY'),\ self.network(name='geneve_net', providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, )) as net: self.assertEqual('geneve', net['network'].get(pnet.NETWORK_TYPE)) # make sure the network is created at the backend nsx_create.assert_called_once() # Delete the network and make sure it is deleted from the backend req = self.new_delete_request('networks', net['network']['id']) res = req.get_response(self.api) self.assertEqual(exc.HTTPNoContent.code, res.status_int) nsx_delete.assert_called_once() def test_create_provider_geneve_network_with_physical_net(self): physical_network = nsx_v3_mocks.DEFAULT_TIER0_ROUTER_UUID providernet_args = {pnet.NETWORK_TYPE: 'geneve', pnet.PHYSICAL_NETWORK: physical_network} with mock.patch( 'vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='OVERLAY'),\ self.network(name='geneve_net', providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, )) as net: self.assertEqual('geneve', net['network'].get(pnet.NETWORK_TYPE)) def test_create_provider_geneve_network_with_vlan(self): providernet_args = {pnet.NETWORK_TYPE: 'geneve', pnet.SEGMENTATION_ID: 11} with mock.patch( 'vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='OVERLAY'): result = self._create_network(fmt='json', name='bad_geneve_net', admin_state_up=True, providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, pnet.SEGMENTATION_ID)) data = self.deserialize('json', result) # should fail self.assertEqual('InvalidInput', data['NeutronError']['type']) def test_create_provider_vlan_network(self): providernet_args = {pnet.NETWORK_TYPE: 'vlan', pnet.SEGMENTATION_ID: 11} with mock.patch('vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.' 'create', side_effect=_return_id_key) as nsx_create, \ mock.patch('vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.' 'delete') as nsx_delete, \ mock.patch('vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='VLAN'),\ self.network(name='vlan_net', providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, pnet.SEGMENTATION_ID)) as net: self.assertEqual('vlan', net['network'].get(pnet.NETWORK_TYPE)) # make sure the network is created at the backend nsx_create.assert_called_once() # Delete the network and make sure it is deleted from the backend req = self.new_delete_request('networks', net['network']['id']) res = req.get_response(self.api) self.assertEqual(exc.HTTPNoContent.code, res.status_int) nsx_delete.assert_called_once() def test_create_provider_nsx_network(self): physical_network = 'Fake logical switch' providernet_args = {pnet.NETWORK_TYPE: 'nsx-net', pnet.PHYSICAL_NETWORK: physical_network} with mock.patch( 'vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.create', side_effect=nsxlib_exc.ResourceNotFound) as nsx_create, \ mock.patch('vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.' 'delete') as nsx_delete, \ self.network(name='nsx_net', providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK)) as net: self.assertEqual('nsx-net', net['network'].get(pnet.NETWORK_TYPE)) self.assertEqual(physical_network, net['network'].get(pnet.PHYSICAL_NETWORK)) # make sure the network is NOT created at the backend nsx_create.assert_not_called() # Delete the network. It should NOT deleted from the backend req = self.new_delete_request('networks', net['network']['id']) res = req.get_response(self.api) self.assertEqual(exc.HTTPNoContent.code, res.status_int) nsx_delete.assert_not_called() def test_create_provider_bad_nsx_network(self): physical_network = 'Bad logical switch' providernet_args = {pnet.NETWORK_TYPE: 'nsx-net', pnet.PHYSICAL_NETWORK: physical_network} with mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get", side_effect=nsxlib_exc.ResourceNotFound): result = self._create_network(fmt='json', name='bad_nsx_net', admin_state_up=True, providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK)) data = self.deserialize('json', result) # should fail self.assertEqual('InvalidInput', data['NeutronError']['type']) def test_create_ens_network_with_no_port_sec(self): cfg.CONF.set_override('ens_support', True, 'nsx_v3') providernet_args = {psec.PORTSECURITY: False} with mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone." "get_host_switch_mode", return_value="ENS"),\ mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get", return_value={'transport_zone_id': 'xxx'}): result = self._create_network(fmt='json', name='ens_net', admin_state_up=True, providernet_args=providernet_args, arg_list=(psec.PORTSECURITY,)) res = self.deserialize('json', result) # should succeed, and net should have port security disabled self.assertFalse(res['network']['port_security_enabled']) def test_create_ens_network_with_port_sec_supported(self): cfg.CONF.set_override('ens_support', True, 'nsx_v3') providernet_args = {psec.PORTSECURITY: True} with mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone." "get_host_switch_mode", return_value="ENS"),\ mock.patch("vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch." "get", return_value={'transport_zone_id': 'xxx'}): result = self._create_network(fmt='json', name='ens_net', admin_state_up=True, providernet_args=providernet_args, arg_list=(psec.PORTSECURITY,)) res = self.deserialize('json', result) # should succeed self.assertTrue(res['network'][psec.PORTSECURITY]) def test_create_ens_network_disable_default_port_security(self): cfg.CONF.set_override('ens_support', True, 'nsx_v3') cfg.CONF.set_override('disable_port_security_for_ens', True, 'nsx_v3') mock_ens = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_host_switch_mode', return_value='ENS') mock_tz = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibLogicalSwitch.get', return_value={'transport_zone_id': 'xxx'}) mock_tt = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_transport_type', return_value='VLAN') data = {'network': { 'name': 'portsec_net', 'admin_state_up': True, 'shared': False, 'tenant_id': 'some_tenant', 'provider:network_type': 'flat', 'provider:physical_network': 'xxx', 'port_security_enabled': True}} with mock_ens, mock_tz, mock_tt: self.plugin.create_network(context.get_admin_context(), data) def test_create_ens_network_with_qos(self): cfg.CONF.set_override('ens_support', True, 'nsx_v3') mock_ens = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_host_switch_mode', return_value='ENS') mock_tz = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibLogicalSwitch.get', return_value={'transport_zone_id': 'xxx'}) mock_tt = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_transport_type', return_value='VLAN') mock_ver = mock.patch("vmware_nsxlib.v3.NsxLib.get_version", return_value='2.4.0') policy_id = uuidutils.generate_uuid() data = {'network': { 'name': 'qos_net', 'tenant_id': 'some_tenant', 'provider:network_type': 'flat', 'provider:physical_network': 'xxx', 'qos_policy_id': policy_id, 'port_security_enabled': False}} with mock_ens, mock_tz, mock_tt, mock_ver, mock.patch.object( self.plugin, '_validate_qos_policy_id'): self.assertRaises(n_exc.InvalidInput, self.plugin.create_network, context.get_admin_context(), data) def test_update_ens_network_with_qos(self): cfg.CONF.set_override('ens_support', True, 'nsx_v3') mock_ens = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_host_switch_mode', return_value='ENS') mock_tz = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibLogicalSwitch.get', return_value={'transport_zone_id': 'xxx'}) mock_tt = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_transport_type', return_value='VLAN') mock_ver = mock.patch("vmware_nsxlib.v3.NsxLib.get_version", return_value='2.4.0') data = {'network': { 'name': 'qos_net', 'tenant_id': 'some_tenant', 'provider:network_type': 'flat', 'provider:physical_network': 'xxx', 'admin_state_up': True, 'shared': False, 'port_security_enabled': False}} with mock_ens, mock_tz, mock_tt, mock_ver,\ mock.patch.object(self.plugin, '_validate_qos_policy_id'): network = self.plugin.create_network(context.get_admin_context(), data) policy_id = uuidutils.generate_uuid() data = {'network': { 'id': network['id'], 'admin_state_up': True, 'shared': False, 'port_security_enabled': False, 'tenant_id': 'some_tenant', 'qos_policy_id': policy_id}} self.assertRaises(n_exc.InvalidInput, self.plugin.update_network, context.get_admin_context(), network['id'], data) def test_update_ens_network_psec_supported(self): cfg.CONF.set_override('ens_support', True, 'nsx_v3') providernet_args = {psec.PORTSECURITY: False} with mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone." "get_host_switch_mode", return_value="ENS"),\ mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get", return_value={'transport_zone_id': 'xxx'}): result = self._create_network(fmt='json', name='ens_net', admin_state_up=True, providernet_args=providernet_args, arg_list=(psec.PORTSECURITY,)) net = self.deserialize('json', result) net_id = net['network']['id'] args = {'network': {psec.PORTSECURITY: True}} req = self.new_update_request('networks', args, net_id, fmt='json') res = self.deserialize('json', req.get_response(self.api)) # should succeed self.assertTrue(res['network'][psec.PORTSECURITY]) def test_create_transparent_vlan_network(self): providernet_args = {vlan_apidef.VLANTRANSPARENT: True} with mock.patch( 'vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='OVERLAY'),\ self.network(name='vt_net', providernet_args=providernet_args, arg_list=(vlan_apidef.VLANTRANSPARENT, )) as net: self.assertTrue(net['network'].get(vlan_apidef.VLANTRANSPARENT)) def test_create_provider_vlan_network_with_transparent(self): providernet_args = {pnet.NETWORK_TYPE: 'vlan', vlan_apidef.VLANTRANSPARENT: True} with mock.patch('vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='VLAN'): result = self._create_network(fmt='json', name='badvlan_net', admin_state_up=True, providernet_args=providernet_args, arg_list=( pnet.NETWORK_TYPE, pnet.SEGMENTATION_ID, vlan_apidef.VLANTRANSPARENT)) data = self.deserialize('json', result) self.assertEqual('vlan', data['network'].get(pnet.NETWORK_TYPE)) def _test_generate_tag(self, vlan_id): net_type = 'vlan' name = 'phys_net' plugin = directory.get_plugin() plugin._network_vlans = plugin_utils.parse_network_vlan_ranges( cfg.CONF.nsx_v3.network_vlan_ranges) expected = [('subnets', []), ('name', name), ('admin_state_up', True), ('status', 'ACTIVE'), ('shared', False), (pnet.NETWORK_TYPE, net_type), (pnet.PHYSICAL_NETWORK, 'fb69d878-958e-4f32-84e4-50286f26226b'), (pnet.SEGMENTATION_ID, vlan_id)] providernet_args = {pnet.NETWORK_TYPE: net_type, pnet.PHYSICAL_NETWORK: 'fb69d878-958e-4f32-84e4-50286f26226b'} gtt_path = "vmware_nsxlib.v3.core_resources." \ "NsxLibTransportZone.get_transport_type" with mock.patch(gtt_path, return_value='VLAN'): with self.network(name=name, providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK)) as net: for k, v in expected: self.assertEqual(net['network'][k], v) def test_create_phys_vlan_generate(self): cfg.CONF.set_override('network_vlan_ranges', 'fb69d878-958e-4f32-84e4-50286f26226b', 'nsx_v3') self._test_generate_tag(1) def test_create_phys_vlan_generate_range(self): cfg.CONF.set_override('network_vlan_ranges', 'fb69d878-958e-4f32-84e4-' '50286f26226b:100:110', 'nsx_v3') self._test_generate_tag(100) def test_create_phys_vlan_network_outofrange_returns_503(self): cfg.CONF.set_override('network_vlan_ranges', 'fb69d878-958e-4f32-84e4-' '50286f26226b:9:10', 'nsx_v3') self._test_generate_tag(9) self._test_generate_tag(10) with testlib_api.ExpectedException(exc.HTTPClientError) as ctx_manager: self._test_generate_tag(11) self.assertEqual(ctx_manager.exception.code, 503) def test_update_external_flag_on_net(self): with self.network() as net: # should fail to update the network to external args = {'network': {'router:external': 'True'}} req = self.new_update_request('networks', args, net['network']['id'], fmt='json') res = self.deserialize('json', req.get_response(self.api)) self.assertEqual('InvalidInput', res['NeutronError']['type']) def test_network_update_external(self): # This plugin does not support updating the external flag of a network self.skipTest("UnSupported") def test_network_update_external_failure(self): data = {'network': {'name': 'net1', 'router:external': 'True', 'tenant_id': 'tenant_one', 'provider:physical_network': 'stam'}} network_req = self.new_create_request('networks', data) network = self.deserialize(self.fmt, network_req.get_response(self.api)) ext_net_id = network['network']['id'] # should fail to update the network to non-external args = {'network': {'router:external': 'False'}} req = self.new_update_request('networks', args, ext_net_id, fmt='json') res = self.deserialize('json', req.get_response(self.api)) self.assertEqual('InvalidInput', res['NeutronError']['type']) def test_update_network_rollback(self): with self.network() as net: # Fail the backend update with mock.patch("vmware_nsxlib.v3.core_resources." "NsxLibLogicalSwitch.update", side_effect=nsxlib_exc.InvalidInput): args = {'network': {'description': 'test rollback'}} req = self.new_update_request('networks', args, net['network']['id'], fmt='json') res = self.deserialize('json', req.get_response(self.api)) # should fail with the nsxlib error (meaning that the rollback # did not fail) self.assertEqual('InvalidInput', res['NeutronError']['type']) def test_update_network_port_sec(self): data = {'network': { 'name': 'psec_net', 'tenant_id': 'some_tenant', 'admin_state_up': True, 'shared': False, 'port_security_enabled': True}} network = self.plugin.create_network(context.get_admin_context(), data) self.assertEqual(True, network['port_security_enabled']) data = {'network': { 'id': network['id'], 'admin_state_up': True, 'shared': False, 'port_security_enabled': False, 'tenant_id': 'some_tenant'}} res = self.plugin.update_network(context.get_admin_context(), network['id'], data) self.assertEqual(False, res['port_security_enabled']) class TestSubnetsV2(common_v3.NsxV3TestSubnets, NsxV3PluginTestCaseMixin): def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None): super(TestSubnetsV2, self).setUp(plugin=plugin, ext_mgr=ext_mgr) def test_create_subnet_with_shared_address_space(self): with self.network() as network: data = {'subnet': {'network_id': network['network']['id'], 'cidr': '100.64.0.0/16', 'name': 'sub1', 'enable_dhcp': False, 'dns_nameservers': None, 'allocation_pools': None, 'tenant_id': 'tenant_one', 'host_routes': None, 'ip_version': 4}} self.assertRaises(n_exc.InvalidInput, self.plugin.create_subnet, context.get_admin_context(), data) def _create_external_network(self): data = {'network': {'name': 'net1', 'router:external': 'True', 'tenant_id': 'tenant_one', 'provider:physical_network': 'stam'}} network_req = self.new_create_request('networks', data) network = self.deserialize(self.fmt, network_req.get_response(self.api)) return network def test_create_subnet_with_conflicting_t0_address(self): network = self._create_external_network() data = {'subnet': {'network_id': network['network']['id'], 'cidr': '172.20.1.0/24', 'name': 'sub1', 'enable_dhcp': False, 'dns_nameservers': None, 'allocation_pools': None, 'tenant_id': 'tenant_one', 'host_routes': None, 'ip_version': 4}} ports = [{'subnets': [{'ip_addresses': [u'172.20.1.60'], 'prefix_length': 24}], 'resource_type': 'LogicalRouterUpLinkPort'}] with mock.patch.object(self.plugin.nsxlib.logical_router_port, 'get_by_router_id', return_value=ports): self.assertRaises(n_exc.InvalidInput, self.plugin.create_subnet, context.get_admin_context(), data) def test_subnet_native_dhcp_subnet_enabled(self): self._enable_native_dhcp_md() with self.network() as network: with mock.patch.object(self.plugin, '_enable_native_dhcp') as enable_dhcp,\ self.subnet(network=network, enable_dhcp=True): # Native dhcp should be set for this subnet self.assertTrue(enable_dhcp.called) def test_subnet_native_dhcp_subnet_disabled(self): self._enable_native_dhcp_md() with self.network() as network: with mock.patch.object(self.plugin, '_enable_native_dhcp') as enable_dhcp,\ self.subnet(network=network, enable_dhcp=False): # Native dhcp should not be set for this subnet self.assertFalse(enable_dhcp.called) def test_subnet_native_dhcp_with_relay(self): """Verify that the relay service is added to the router interface""" self._enable_dhcp_relay() with self.network() as network: with mock.patch.object(self.plugin, '_enable_native_dhcp') as enable_dhcp,\ self.subnet(network=network, enable_dhcp=True): # Native dhcp should not be set for this subnet self.assertFalse(enable_dhcp.called) def test_subnet_native_dhcp_flat_subnet_disabled(self): self._enable_native_dhcp_md() providernet_args = {pnet.NETWORK_TYPE: 'flat'} with mock.patch('vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='VLAN'): with self.network(name='flat_net', providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, )) as network: data = {'subnet': {'network_id': network['network']['id'], 'cidr': '172.20.1.0/24', 'name': 'sub1', 'enable_dhcp': False, 'dns_nameservers': None, 'allocation_pools': None, 'tenant_id': 'tenant_one', 'host_routes': None, 'ip_version': 4}} self.plugin.create_subnet( context.get_admin_context(), data) def test_subnet_native_dhcp_flat_subnet_enabled(self): self._enable_native_dhcp_md() providernet_args = {pnet.NETWORK_TYPE: 'flat'} with mock.patch('vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='VLAN'): with self.network(name='flat_net', providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, )) as network: data = {'subnet': {'network_id': network['network']['id'], 'cidr': '172.20.1.0/24', 'name': 'sub1', 'enable_dhcp': True, 'dns_nameservers': None, 'allocation_pools': None, 'tenant_id': 'tenant_one', 'host_routes': None, 'ip_version': 4}} self.assertRaises(n_exc.InvalidInput, self.plugin.create_subnet, context.get_admin_context(), data) def test_fail_create_static_routes_per_subnet_over_limit(self): with self.network() as network: data = {'subnet': {'network_id': network['network']['id'], 'cidr': '10.0.0.0/16', 'name': 'sub1', 'dns_nameservers': None, 'allocation_pools': None, 'tenant_id': 'tenant_one', 'enable_dhcp': False, 'ip_version': 4}} count = 1 host_routes = [] while count < nsx_constants.MAX_STATIC_ROUTES: host_routes.append("'host_routes': [{'destination': " "'135.207.0.0/%s', 'nexthop': " "'1.2.3.%s'}]" % (count, count)) count += 1 data['subnet']['host_routes'] = host_routes self.assertRaises(n_exc.InvalidInput, self.plugin.create_subnet, context.get_admin_context(), data) def test_create_subnet_disable_dhcp_with_host_route_fails(self): with self.network() as network: data = {'subnet': {'network_id': network['network']['id'], 'cidr': '172.20.1.0/24', 'name': 'sub1', 'dns_nameservers': None, 'allocation_pools': None, 'tenant_id': 'tenant_one', 'enable_dhcp': False, 'host_routes': [{ 'destination': '135.207.0.0/16', 'nexthop': '1.2.3.4'}], 'ip_version': 4}} self.assertRaises(n_exc.InvalidInput, self.plugin.create_subnet, context.get_admin_context(), data) def test_update_subnet_disable_dhcp_with_host_route_fails(self): with self.network() as network: data = {'subnet': {'network_id': network['network']['id'], 'cidr': '172.20.1.0/24', 'name': 'sub1', 'dns_nameservers': None, 'allocation_pools': None, 'tenant_id': 'tenant_one', 'enable_dhcp': True, 'host_routes': [{ 'destination': '135.207.0.0/16', 'nexthop': '1.2.3.4'}], 'ip_version': 4}} subnet = self.plugin.create_subnet( context.get_admin_context(), data) data['subnet']['enable_dhcp'] = False self.assertRaises(n_exc.InvalidInput, self.plugin.update_subnet, context.get_admin_context(), subnet['id'], data) def test_create_subnet_ipv6_gw_is_nw_start_addr(self): self.skipTest('No DHCP v6 Support yet') def test_create_subnet_ipv6_gw_is_nw_start_addr_canonicalize(self): self.skipTest('No DHCP v6 Support yet') def test_create_subnet_ipv6_gw_is_nw_end_addr(self): self.skipTest('No DHCP v6 Support yet') def test_create_subnet_ipv6_first_ip_owned_by_router(self): self.skipTest('No DHCP v6 Support yet') def test_create_subnet_ipv6_first_ip_owned_by_non_router(self): self.skipTest('No DHCP v6 Support yet') def test_create_subnet_with_v6_pd_allocation_pool(self): self.skipTest('No DHCP v6 Support yet') def test_create_subnet_with_v6_allocation_pool(self): self.skipTest('No DHCP v6 Support yet') def test_update_subnet_ipv6_ra_mode_fails(self): self.skipTest('No DHCP v6 Support yet') def test_create_subnet_ipv6_slaac_with_ip_already_allocated(self): self.skipTest('No DHCP v6 Support yet') def test_create_subnet_ipv6_slaac_with_db_reference_error(self): self.skipTest('No DHCP v6 Support yet') class TestPortsV2(common_v3.NsxV3SubnetMixin, common_v3.NsxV3TestPorts, NsxV3PluginTestCaseMixin, test_bindings.PortBindingsTestCase, test_bindings.PortBindingsHostTestCaseMixin, test_bindings.PortBindingsVnicTestCaseMixin): VIF_TYPE = portbindings.VIF_TYPE_OVS HAS_PORT_FILTER = True def setUp(self): cfg.CONF.set_override('switching_profiles', [NSX_SWITCH_PROFILE], 'nsx_v3') # add vlan transparent to the configuration cfg.CONF.set_override('vlan_transparent', True) super(TestPortsV2, self).setUp() self.plugin = directory.get_plugin() self.ctx = context.get_admin_context() def test_update_port_delete_ip(self): # This test case overrides the default because the nsx plugin # implements port_security/security groups and it is not allowed # to remove an ip address from a port unless the security group # is first removed. with self.subnet() as subnet: with self.port(subnet=subnet) as port: data = {'port': {'admin_state_up': False, 'fixed_ips': [], secgrp.SECURITYGROUPS: []}} req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize('json', req.get_response(self.api)) self.assertEqual(res['port']['admin_state_up'], data['port']['admin_state_up']) self.assertEqual(res['port']['fixed_ips'], data['port']['fixed_ips']) def test_delete_dhcp_port(self): self._enable_native_dhcp_md() with self.subnet(): pl = directory.get_plugin() ctx = context.Context(user_id=None, tenant_id=self._tenant_id, is_admin=False) ports = pl.get_ports( ctx, filters={'device_owner': [constants.DEVICE_OWNER_DHCP]}) req = self.new_delete_request('ports', ports[0]['id']) res = req.get_response(self.api) self.assertEqual(exc.HTTPBadRequest.code, res.status_int) def test_fail_create_port_with_ext_net(self): expected_error = 'InvalidInput' with self._create_l3_ext_network() as network: with self.subnet(network=network, cidr='10.0.0.0/24'): device_owner = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'X' res = self._create_port(self.fmt, network['network']['id'], exc.HTTPBadRequest.code, device_owner=device_owner) data = self.deserialize(self.fmt, res) self.assertEqual(expected_error, data['NeutronError']['type']) def test_fail_update_port_with_ext_net(self): with self._create_l3_ext_network() as network: with self.subnet(network=network, cidr='10.0.0.0/24') as subnet: with self.port(subnet=subnet) as port: device_owner = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'X' data = {'port': {'device_owner': device_owner}} req = self.new_update_request('ports', data, port['port']['id']) res = req.get_response(self.api) self.assertEqual(exc.HTTPBadRequest.code, res.status_int) def test_fail_update_lb_port_with_allowed_address_pairs(self): with self.network() as network: data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'pair_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': constants.DEVICE_OWNER_LOADBALANCERV2, 'fixed_ips': []} } port = self.plugin.create_port(self.ctx, data) data['port']['allowed_address_pairs'] = '10.0.0.1' self.assertRaises( n_exc.InvalidInput, self.plugin.update_port, self.ctx, port['id'], data) def test_fail_create_allowed_address_pairs_over_limit(self): with self.network() as network, self.subnet( network=network, enable_dhcp=True) as s1: data = { 'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'pair_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [{'subnet_id': s1['subnet']['id']}] } } count = 1 address_pairs = [] while count < 129: address_pairs.append({'ip_address': '10.0.0.%s' % count}) count += 1 data['port']['allowed_address_pairs'] = address_pairs self.assertRaises(n_exc.InvalidInput, self.plugin.create_port, self.ctx, data) def test_fail_update_lb_port_with_fixed_ip(self): with self.network() as network: data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'pair_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': constants.DEVICE_OWNER_LOADBALANCERV2, 'fixed_ips': []} } port = self.plugin.create_port(self.ctx, data) data['port']['fixed_ips'] = [{'ip_address': '10.0.0.1'}] self.assertRaises( n_exc.InvalidInput, self.plugin.update_port, self.ctx, port['id'], data) def test_create_port_with_qos(self): with self.network() as network: policy_id = uuidutils.generate_uuid() data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'qos_policy_id': policy_id, 'name': 'qos_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [], 'mac_address': '00:00:00:00:00:01'} } with mock.patch.object(self.plugin, '_get_qos_profile_id'),\ mock.patch.object(self.plugin, '_validate_qos_policy_id'): port = self.plugin.create_port(self.ctx, data) self.assertEqual(policy_id, port['qos_policy_id']) # Get port should also return the qos policy id with mock.patch('vmware_nsx.services.qos.common.utils.' 'get_port_policy_id', return_value=policy_id): port = self.plugin.get_port(self.ctx, port['id']) self.assertEqual(policy_id, port['qos_policy_id']) def test_update_port_with_qos(self): with self.network() as network: data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'qos_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [], 'mac_address': '00:00:00:00:00:01'} } port = self.plugin.create_port(self.ctx, data) policy_id = uuidutils.generate_uuid() data['port']['qos_policy_id'] = policy_id with mock.patch.object(self.plugin, '_get_qos_profile_id'),\ mock.patch.object(self.plugin, '_validate_qos_policy_id'): res = self.plugin.update_port(self.ctx, port['id'], data) self.assertEqual(policy_id, res['qos_policy_id']) # Get port should also return the qos policy id with mock.patch('vmware_nsx.services.qos.common.utils.' 'get_port_policy_id', return_value=policy_id): res = self.plugin.get_port(self.ctx, port['id']) self.assertEqual(policy_id, res['qos_policy_id']) # now remove the qos from the port data['port']['qos_policy_id'] = None res = self.plugin.update_port(self.ctx, port['id'], data) self.assertIsNone(res['qos_policy_id']) def test_create_ext_port_with_qos_fail(self): with self._create_l3_ext_network() as network: with self.subnet(network=network, cidr='10.0.0.0/24'),\ mock.patch.object(self.plugin, '_validate_qos_policy_id'): policy_id = uuidutils.generate_uuid() data = {'port': {'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'qos_policy_id': policy_id}} # Cannot add qos policy to a router port self.assertRaises(n_exc.InvalidInput, self.plugin.create_port, self.ctx, data) def _test_create_illegal_port_with_qos_fail(self, device_owner): with self.network() as network: with self.subnet(network=network, cidr='10.0.0.0/24'),\ mock.patch.object(self.plugin, '_validate_qos_policy_id'): policy_id = uuidutils.generate_uuid() data = {'port': {'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'device_owner': device_owner, 'qos_policy_id': policy_id}} # Cannot add qos policy to this type of port self.assertRaises(n_exc.InvalidInput, self.plugin.create_port, self.ctx, data) def test_create_port_ens_with_qos_fail(self): with self.network() as network: with self.subnet(network=network, cidr='10.0.0.0/24'): policy_id = uuidutils.generate_uuid() mock_ens = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_host_switch_mode', return_value='ENS') mock_tz = mock.patch('vmware_nsxlib.v3' '.core_resources' '.NsxLibLogicalSwitch.get', return_value={ 'transport_zone_id': 'xxx'}) mock_tt = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_transport_type', return_value='VLAN') mock_ver = mock.patch("vmware_nsxlib.v3.NsxLib.get_version", return_value='2.4.0') data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'qos_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [], 'port_security_enabled': False, 'mac_address': '00:00:00:00:00:01', 'qos_policy_id': policy_id} } # Cannot add qos policy to this type of port with mock_ens, mock_tz, mock_tt, mock_ver,\ mock.patch.object(self.plugin, '_validate_qos_policy_id'): self.assertRaises(n_exc.InvalidInput, self.plugin.create_port, self.ctx, data) def test_create_port_ens_with_sg(self): cfg.CONF.set_override('disable_port_security_for_ens', True, 'nsx_v3') with self.network() as network: with self.subnet(network=network, cidr='10.0.0.0/24'): mock_ens = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_host_switch_mode', return_value='ENS') mock_tz = mock.patch('vmware_nsxlib.v3' '.core_resources' '.NsxLibLogicalSwitch.get', return_value={ 'transport_zone_id': 'xxx'}) mock_tt = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_transport_type', return_value='VLAN') data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'sg_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [], 'mac_address': '00:00:00:00:00:01', 'port_security_enabled': True} } with mock_ens, mock_tz, mock_tt: self.plugin.create_port(self.ctx, data) def test_update_port_ens_with_qos_fail(self): with self.network() as network: with self.subnet(network=network, cidr='10.0.0.0/24'): policy_id = uuidutils.generate_uuid() mock_ens = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_host_switch_mode', return_value='ENS') mock_tz = mock.patch('vmware_nsxlib.v3' '.core_resources' '.NsxLibLogicalSwitch.get', return_value={ 'transport_zone_id': 'xxx'}) mock_tt = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_transport_type', return_value='VLAN') mock_ver = mock.patch("vmware_nsxlib.v3.NsxLib.get_version", return_value='2.4.0') data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'qos_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [], 'port_security_enabled': False, 'mac_address': '00:00:00:00:00:01'} } with mock_ens, mock_tz, mock_tt, mock_ver,\ mock.patch.object(self.plugin, '_validate_qos_policy_id'): port = self.plugin.create_port(self.ctx, data) data['port'] = {'qos_policy_id': policy_id} self.assertRaises(n_exc.InvalidInput, self.plugin.update_port, self.ctx, port['id'], data) def test_create_port_with_mac_learning_true(self): with self.network() as network: data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'qos_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [], 'port_security_enabled': False, 'mac_address': '00:00:00:00:00:01', 'mac_learning_enabled': True} } port = self.plugin.create_port(self.ctx, data) self.assertTrue(port['mac_learning_enabled']) def test_create_port_with_mac_learning_false(self): with self.network() as network: data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'qos_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [], 'port_security_enabled': False, 'mac_address': '00:00:00:00:00:01', 'mac_learning_enabled': False} } port = self.plugin.create_port(self.ctx, data) self.assertFalse(port['mac_learning_enabled']) def test_update_port_with_mac_learning_true(self): with self.network() as network: data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'qos_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [], 'port_security_enabled': False, 'mac_address': '00:00:00:00:00:01'} } port = self.plugin.create_port(self.ctx, data) data['port']['mac_learning_enabled'] = True update_res = self.plugin.update_port(self.ctx, port['id'], data) self.assertTrue(update_res['mac_learning_enabled']) def test_update_port_with_mac_learning_false(self): with self.network() as network: data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'qos_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [], 'port_security_enabled': False, 'mac_address': '00:00:00:00:00:01'} } port = self.plugin.create_port(self.ctx, data) data['port']['mac_learning_enabled'] = False update_res = self.plugin.update_port(self.ctx, port['id'], data) self.assertFalse(update_res['mac_learning_enabled']) def test_update_port_with_mac_learning_failes(self): with self.network() as network: data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'qos_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': constants.DEVICE_OWNER_FLOATINGIP, 'fixed_ips': [], 'port_security_enabled': False, 'mac_address': '00:00:00:00:00:01'} } port = self.plugin.create_port(self.ctx, data) data['port']['mac_learning_enabled'] = True self.assertRaises( n_exc.InvalidInput, self.plugin.update_port, self.ctx, port['id'], data) def test_create_router_port_with_qos_fail(self): self._test_create_illegal_port_with_qos_fail( 'network:router_interface') def test_create_dhcp_port_with_qos_fail(self): self._test_create_illegal_port_with_qos_fail('network:dhcp') def _test_update_illegal_port_with_qos_fail(self, device_owner): with self.network() as network: with self.subnet(network=network, cidr='10.0.0.0/24'),\ mock.patch.object(self.plugin, '_validate_qos_policy_id'): policy_id = uuidutils.generate_uuid() data = {'port': {'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'qos_port', 'admin_state_up': True, 'fixed_ips': [], 'mac_address': '00:00:00:00:00:01', 'device_id': 'dummy', 'device_owner': ''}} port = self.plugin.create_port(self.ctx, data) policy_id = uuidutils.generate_uuid() data['port'] = {'qos_policy_id': policy_id, 'device_owner': device_owner} # Cannot add qos policy to a router interface port self.assertRaises(n_exc.InvalidInput, self.plugin.update_port, self.ctx, port['id'], data) def test_update_router_port_with_qos_fail(self): self._test_update_illegal_port_with_qos_fail( 'network:router_interface') def test_update_dhcp_port_with_qos_fail(self): self._test_update_illegal_port_with_qos_fail('network:dhcp') def test_create_port_with_qos_on_net(self): with self.network() as network: policy_id = uuidutils.generate_uuid() device_owner = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'X' data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'qos_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': device_owner, 'fixed_ips': [], 'mac_address': '00:00:00:00:00:01'} } with mock.patch.object(self.plugin, '_get_qos_profile_id') as get_profile,\ mock.patch('vmware_nsx.services.qos.common.utils.' 'get_network_policy_id', return_value=policy_id),\ mock.patch.object(self.plugin, '_validate_qos_policy_id'): self.plugin.create_port(self.ctx, data) get_profile.assert_called_once_with(self.ctx, policy_id) def test_update_port_with_qos_on_net(self): with self.network() as network: data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'qos_port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [], 'mac_address': '00:00:00:00:00:01'} } port = self.plugin.create_port(self.ctx, data) policy_id = uuidutils.generate_uuid() device_owner = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'X' data['port']['device_owner'] = device_owner with mock.patch.object(self.plugin, '_get_qos_profile_id') as get_profile,\ mock.patch('vmware_nsx.services.qos.common.utils.' 'get_network_policy_id', return_value=policy_id),\ mock.patch.object(self.plugin, '_validate_qos_policy_id'): self.plugin.update_port(self.ctx, port['id'], data) get_profile.assert_called_once_with(self.ctx, policy_id) def _get_ports_with_fields(self, tenid, fields, expected_count): pl = directory.get_plugin() ctx = context.Context(user_id=None, tenant_id=tenid, is_admin=False) ports = pl.get_ports(ctx, filters={'tenant_id': [tenid]}, fields=fields) self.assertEqual(expected_count, len(ports)) def test_get_ports_with_fields(self): with self.port(), self.port(), self.port(), self.port() as p: tenid = p['port']['tenant_id'] # get all fields: self._get_ports_with_fields(tenid, None, 4) # get specific fields: self._get_ports_with_fields(tenid, 'mac_address', 4) self._get_ports_with_fields(tenid, 'network_id', 4) def test_list_ports_filtered_by_security_groups(self): ctx = context.get_admin_context() with self.port() as port1, self.port() as port2: query_params = "security_groups=%s" % ( port1['port']['security_groups'][0]) ports_data = self._list('ports', query_params=query_params) self.assertEqual(set([port1['port']['id'], port2['port']['id']]), set([port['id'] for port in ports_data['ports']])) query_params = "security_groups=%s&id=%s" % ( port1['port']['security_groups'][0], port1['port']['id']) ports_data = self._list('ports', query_params=query_params) self.assertEqual(port1['port']['id'], ports_data['ports'][0]['id']) self.assertEqual(1, len(ports_data['ports'])) temp_sg = {'security_group': {'tenant_id': 'some_tenant', 'name': '', 'description': 's'}} sg_dbMixin = sg_db.SecurityGroupDbMixin() sg = sg_dbMixin.create_security_group(ctx, temp_sg) sg_dbMixin._delete_port_security_group_bindings( ctx, port2['port']['id']) sg_dbMixin._create_port_security_group_binding( ctx, port2['port']['id'], sg['id']) port2['port']['security_groups'][0] = sg['id'] query_params = "security_groups=%s" % ( port1['port']['security_groups'][0]) ports_data = self._list('ports', query_params=query_params) self.assertEqual(port1['port']['id'], ports_data['ports'][0]['id']) self.assertEqual(1, len(ports_data['ports'])) query_params = "security_groups=%s" % ( (port2['port']['security_groups'][0])) ports_data = self._list('ports', query_params=query_params) self.assertEqual(port2['port']['id'], ports_data['ports'][0]['id']) def test_port_failure_rollback_dhcp_exception(self): self._enable_native_dhcp_md() self.plugin = directory.get_plugin() with mock.patch.object(self.plugin, '_add_port_mp_dhcp_binding', side_effect=nsxlib_exc.ManagerError): self.port() ctx = context.get_admin_context() networks = self.plugin.get_ports(ctx) self.assertListEqual([], networks) def test_port_DB_failure_rollback_dhcp_exception(self): self._enable_native_dhcp_md() self.plugin = directory.get_plugin() with mock.patch('vmware_nsx.db.db.add_neutron_nsx_dhcp_binding', side_effect=db_exc.DBError),\ mock.patch.object(self.plugin, '_enable_native_dhcp'),\ mock.patch('vmware_nsx.db.db.get_nsx_service_binding'),\ self.network() as network,\ self.subnet(network, cidr='10.0.1.0/24') as subnet: data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'p1', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [{'subnet_id': subnet['subnet']['id'], 'ip_address': '10.0.1.2'}], 'mac_address': '00:00:00:00:00:01'} } # making sure the port creation succeeded anyway created_port = self.plugin.create_port(self.ctx, data) self.assertEqual('fake_device', created_port['device_id']) def test_create_port_with_switching_profiles(self): """Tests that nsx ports get the configures switching profiles""" self.plugin = directory.get_plugin() with self.network() as network: data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'p1', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': 'fake_owner', 'fixed_ips': [], 'mac_address': '00:00:00:00:00:01'} } with mock.patch.object(self.plugin.nsxlib.logical_port, 'create', return_value={'id': 'fake'}) as nsx_create: self.plugin.create_port(self.ctx, data) expected_prof = self.plugin.get_default_az().\ switching_profiles_objs[0] actual_profs = nsx_create.call_args[1]['switch_profile_ids'] # the ports switching profiles should start with the # configured one self.assertEqual(expected_prof, actual_profs[0]) def test_create_ens_port_with_no_port_sec(self): with self.subnet() as subnet,\ mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone." "get_host_switch_mode", return_value="ENS"),\ mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get", return_value={'transport_zone_id': 'xxx'}): args = {'port': {'network_id': subnet['subnet']['network_id'], 'tenant_id': subnet['subnet']['tenant_id'], 'fixed_ips': [{'subnet_id': subnet['subnet']['id']}], psec.PORTSECURITY: False}} port_req = self.new_create_request('ports', args) port = self.deserialize(self.fmt, port_req.get_response(self.api)) self.assertFalse(port['port']['port_security_enabled']) def test_create_ens_port_with_port_sec_supported(self): with self.subnet() as subnet,\ mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone." "get_host_switch_mode", return_value="ENS"),\ mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch.get", return_value={'transport_zone_id': 'xxx'}): args = {'port': {'network_id': subnet['subnet']['network_id'], 'tenant_id': subnet['subnet']['tenant_id'], 'fixed_ips': [{'subnet_id': subnet['subnet']['id']}], psec.PORTSECURITY: True}} port_req = self.new_create_request('ports', args) res = self.deserialize('json', port_req.get_response(self.api)) # should succeed self.assertTrue(res['port'][psec.PORTSECURITY]) def test_update_ens_port_psec_supported(self): with self.subnet() as subnet,\ mock.patch("vmware_nsxlib.v3.core_resources.NsxLibTransportZone." "get_host_switch_mode", return_value="ENS"),\ mock.patch("vmware_nsxlib.v3.core_resources.NsxLibLogicalSwitch." "get", return_value={'transport_zone_id': 'xxx'}): args = {'port': {'network_id': subnet['subnet']['network_id'], 'tenant_id': subnet['subnet']['tenant_id'], 'fixed_ips': [{'subnet_id': subnet['subnet']['id']}], psec.PORTSECURITY: False}} port_req = self.new_create_request('ports', args) port = self.deserialize(self.fmt, port_req.get_response(self.api)) port_id = port['port']['id'] args = {'port': {psec.PORTSECURITY: True}} req = self.new_update_request('ports', args, port_id) res = self.deserialize('json', req.get_response(self.api)) # should succeed self.assertTrue(res['port'][psec.PORTSECURITY]) def test_update_dhcp_port_device_owner(self): self._enable_native_dhcp_md() with self.subnet(): pl = directory.get_plugin() ctx = context.Context(user_id=None, tenant_id=self._tenant_id, is_admin=False) ports = pl.get_ports( ctx, filters={'device_owner': [constants.DEVICE_OWNER_DHCP]}) port_id = ports[0]['id'] args = {'port': {'admin_state_up': False, 'fixed_ips': [], 'device_owner': 'abcd'}} req = self.new_update_request('ports', args, port_id) res = self.deserialize('json', req.get_response(self.api)) # should fail self.assertEqual('InvalidInput', res['NeutronError']['type']) def test_create_compute_port_with_relay_no_router(self): """Compute port creation should fail if a network with dhcp relay is not connected to a router """ self._enable_dhcp_relay() with self.network() as network, \ self.subnet(network=network, enable_dhcp=True) as s1: device_owner = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'X' data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': device_owner, 'fixed_ips': [{'subnet_id': s1['subnet']['id']}], 'mac_address': '00:00:00:00:00:01'} } self.assertRaises(n_exc.InvalidInput, self.plugin.create_port, self.ctx, data) def test_create_compute_port_with_relay_and_router(self): self._enable_dhcp_relay() with self.network() as network, \ self.subnet(network=network, enable_dhcp=True) as s1,\ mock.patch.object(self.plugin, '_get_router', return_value={'name': 'dummy'}): # first create a router interface to simulate a router data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'port', 'admin_state_up': True, 'device_id': 'dummy', 'device_owner': l3_db.DEVICE_OWNER_ROUTER_INTF, 'fixed_ips': [{'subnet_id': s1['subnet']['id']}], 'mac_address': '00:00:00:00:00:02'} } port1 = self.plugin.create_port(self.ctx, data) self.assertIn('id', port1) # Now create a compute port device_owner = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'X' data = {'port': { 'network_id': network['network']['id'], 'tenant_id': self._tenant_id, 'name': 'port', 'admin_state_up': True, 'device_id': 'fake_device', 'device_owner': device_owner, 'fixed_ips': [{'subnet_id': s1['subnet']['id']}], 'mac_address': '00:00:00:00:00:01'} } port2 = self.plugin.create_port(self.ctx, data) self.assertIn('id', port2) def _test_create_direct_network(self, vlan_id=0): net_type = vlan_id and 'vlan' or 'flat' name = 'direct_net' providernet_args = {pnet.NETWORK_TYPE: net_type, pnet.PHYSICAL_NETWORK: 'tzuuid'} if vlan_id: providernet_args[pnet.SEGMENTATION_ID] = vlan_id mock_tt = mock.patch('vmware_nsxlib.v3' '.core_resources.NsxLibTransportZone' '.get_transport_type', return_value='VLAN') mock_tt.start() return self.network(name=name, providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK, pnet.SEGMENTATION_ID)) def _test_create_port_vnic_direct(self, vlan_id): with self._test_create_direct_network(vlan_id=vlan_id) as network: # Check that port security conflicts kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT, psec.PORTSECURITY: True} net_id = network['network']['id'] res = self._create_port(self.fmt, net_id=net_id, arg_list=(portbindings.VNIC_TYPE, psec.PORTSECURITY), **kwargs) self.assertEqual(res.status_int, exc.HTTPBadRequest.code) # Check that security group conflicts kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT, 'security_groups': [ '4cd70774-cc67-4a87-9b39-7d1db38eb087'], psec.PORTSECURITY: False} net_id = network['network']['id'] res = self._create_port(self.fmt, net_id=net_id, arg_list=(portbindings.VNIC_TYPE, psec.PORTSECURITY), **kwargs) self.assertEqual(res.status_int, exc.HTTPBadRequest.code) # All is kosher so we can create the port kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT} net_id = network['network']['id'] res = self._create_port(self.fmt, net_id=net_id, arg_list=(portbindings.VNIC_TYPE,), **kwargs) port = self.deserialize('json', res) self.assertEqual("direct", port['port'][portbindings.VNIC_TYPE]) self.assertEqual("dvs", port['port'][portbindings.VIF_TYPE]) self.assertEqual( vlan_id, port['port'][portbindings.VIF_DETAILS]['segmentation-id']) # try to get the same port req = self.new_show_request('ports', port['port']['id'], self.fmt) sport = self.deserialize(self.fmt, req.get_response(self.api)) self.assertEqual("dvs", sport['port'][portbindings.VIF_TYPE]) self.assertEqual("direct", sport['port'][portbindings.VNIC_TYPE]) self.assertEqual( vlan_id, sport['port'][portbindings.VIF_DETAILS]['segmentation-id']) self.assertFalse( sport['port'][portbindings.VIF_DETAILS]['vlan-transparent']) def test_create_port_vnic_direct_flat(self): self._test_create_port_vnic_direct(0) def test_create_port_vnic_direct_vlan(self): self._test_create_port_vnic_direct(10) def test_create_port_vnic_direct_invalid_network(self): with self.network(name='not vlan/flat') as net: kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT, psec.PORTSECURITY: False} net_id = net['network']['id'] res = self._create_port(self.fmt, net_id=net_id, arg_list=(portbindings.VNIC_TYPE, psec.PORTSECURITY), **kwargs) self.assertEqual(exc.HTTPBadRequest.code, res.status_int) def test_update_vnic_direct(self): with self._test_create_direct_network(vlan_id=7) as network: with self.subnet(network=network) as subnet: with self.port(subnet=subnet) as port: # need to do two updates as the update for port security # disabled requires that it can only change 2 items data = {'port': {psec.PORTSECURITY: False, 'security_groups': []}} req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize('json', req.get_response(self.api)) self.assertEqual(portbindings.VNIC_NORMAL, res['port'][portbindings.VNIC_TYPE]) data = {'port': {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT}} req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize('json', req.get_response(self.api)) self.assertEqual(portbindings.VNIC_DIRECT, res['port'][portbindings.VNIC_TYPE]) def test_port_invalid_vnic_type(self): with self._test_create_direct_network(vlan_id=7) as network: kwargs = {portbindings.VNIC_TYPE: 'invalid', psec.PORTSECURITY: False} net_id = network['network']['id'] res = self._create_port(self.fmt, net_id=net_id, arg_list=(portbindings.VNIC_TYPE, psec.PORTSECURITY), **kwargs) self.assertEqual(res.status_int, exc.HTTPBadRequest.code) def test_create_transparent_vlan_port(self): providernet_args = {pnet.NETWORK_TYPE: 'vlan', vlan_apidef.VLANTRANSPARENT: True} with mock.patch('vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='VLAN'): result = self._create_network(fmt='json', name='vlan_net', admin_state_up=True, providernet_args=providernet_args, arg_list=( pnet.NETWORK_TYPE, pnet.SEGMENTATION_ID, vlan_apidef.VLANTRANSPARENT)) network = self.deserialize('json', result) net_id = network['network']['id'] with self.subnet(network=network): kwargs = {portbindings.VNIC_TYPE: portbindings.VNIC_DIRECT} net_id = network['network']['id'] res = self._create_port(self.fmt, net_id=net_id, arg_list=(portbindings.VNIC_TYPE,), **kwargs) port = self.deserialize('json', res) self.assertTrue( port['port'][portbindings.VIF_DETAILS]['vlan-transparent']) @common_v3.with_disable_dhcp def test_requested_subnet_id_v4_and_v6(self): return super(TestPortsV2, self).test_requested_subnet_id_v4_and_v6() def test_port_binding_host(self): with self.port() as port: # add host data = {'port': {portbindings.HOST_ID: 'abc'}} req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize('json', req.get_response(self.api)) self.assertEqual('abc', res['port'][portbindings.HOST_ID]) # remove host data = {'port': {portbindings.HOST_ID: None}} req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize('json', req.get_response(self.api)) self.assertEqual('', res['port'][portbindings.HOST_ID]) class DHCPOptsTestCase(test_dhcpopts.TestExtraDhcpOpt, NsxV3PluginTestCaseMixin): def setUp(self, plugin=None): super(test_dhcpopts.ExtraDhcpOptDBTestCase, self).setUp( plugin=PLUGIN_NAME) class NSXv3DHCPAgentAZAwareWeightSchedulerTestCase( test_dhcpagent.DHCPAgentAZAwareWeightSchedulerTestCase, NsxV3PluginTestCaseMixin): def setUp(self): super(NSXv3DHCPAgentAZAwareWeightSchedulerTestCase, self).setUp() self.plugin = directory.get_plugin() self.ctx = context.get_admin_context() def setup_coreplugin(self, core_plugin=None, load_plugins=True): super(NSXv3DHCPAgentAZAwareWeightSchedulerTestCase, self).setup_coreplugin(core_plugin=PLUGIN_NAME, load_plugins=load_plugins) class TestL3ExtensionManager(object): def get_resources(self): # Simulate extension of L3 attribute map l3.L3().update_attributes_map( l3_egm_apidef.RESOURCE_ATTRIBUTE_MAP) l3.L3().update_attributes_map( xroute_apidef.RESOURCE_ATTRIBUTE_MAP) return (l3.L3.get_resources() + address_scope.Address_scope.get_resources()) def get_actions(self): return [] def get_request_extensions(self): return [] class L3NatTest(test_l3_plugin.L3BaseForIntTests, NsxV3PluginTestCaseMixin, common_v3.FixExternalNetBaseTest, common_v3.NsxV3SubnetMixin, test_address_scope.AddressScopeTestCase): def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None, service_plugins=None): cfg.CONF.set_override('api_extensions_path', vmware.NSXEXT_PATH) cfg.CONF.set_default('max_routes', 3) ext_mgr = ext_mgr or TestL3ExtensionManager() mock_nsx_version = mock.patch.object(nsx_plugin.utils, 'is_nsx_version_2_0_0', new=lambda v: True) mock_nsx_version.start() # Make sure the LB callback is not called on router deletion self.lb_mock1 = mock.patch( "vmware_nsx.services.lbaas.octavia.octavia_listener." "NSXOctaviaListenerEndpoint._check_lb_service_on_router") self.lb_mock1.start() self.lb_mock2 = mock.patch( "vmware_nsx.services.lbaas.octavia.octavia_listener." "NSXOctaviaListenerEndpoint._check_lb_service_on_router_interface") self.lb_mock2.start() super(L3NatTest, self).setUp( plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins) self.plugin_instance = directory.get_plugin() self._plugin_name = "%s.%s" % ( self.plugin_instance.__module__, self.plugin_instance.__class__.__name__) self._plugin_class = self.plugin_instance.__class__ self.plugin_instance.fwaas_callbacks = None self.original_subnet = self.subnet self.original_network = self.network def _set_net_external(self, net_id): # This action is not supported by the V3 plugin pass def external_network(self, name='net1', admin_state_up=True, fmt=None, **kwargs): if not name: name = 'l3_ext_net' physical_network = nsx_v3_mocks.DEFAULT_TIER0_ROUTER_UUID net_type = utils.NetworkTypes.L3_EXT providernet_args = {pnet.NETWORK_TYPE: net_type, pnet.PHYSICAL_NETWORK: physical_network} return self.original_network(name=name, admin_state_up=admin_state_up, fmt=fmt, router__external=True, providernet_args=providernet_args, arg_list=(pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK)) def test_floatingip_create_different_fixed_ip_same_port(self): self.skipTest('Multiple fixed ips on a port are not supported') def test_router_add_interface_multiple_ipv4_subnet_port_returns_400(self): self.skipTest('Multiple fixed ips on a port are not supported') def test_router_add_interface_multiple_ipv6_subnet_port(self): self.skipTest('Multiple fixed ips on a port are not supported') def test_floatingip_update_different_fixed_ip_same_port(self): self.skipTest('Multiple fixed ips on a port are not supported') def test_create_multiple_floatingips_same_fixed_ip_same_port(self): self.skipTest('Multiple fixed ips on a port are not supported') class TestL3NatTestCase(L3NatTest, test_l3_plugin.L3NatDBIntTestCase, test_ext_route.ExtraRouteDBTestCaseBase, test_metadata.MetaDataTestCase): block_dhcp_notifier = False def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None, service_plugins=None): super(TestL3NatTestCase, self).setUp(plugin=plugin, ext_mgr=ext_mgr) cfg.CONF.set_override('metadata_mode', None, 'nsx_v3') cfg.CONF.set_override('metadata_on_demand', False, 'nsx_v3') self.subnet_calls = [] def _test_create_l3_ext_network( self, physical_network=nsx_v3_mocks.DEFAULT_TIER0_ROUTER_UUID): name = 'l3_ext_net' net_type = utils.NetworkTypes.L3_EXT expected = [('subnets', []), ('name', name), ('admin_state_up', True), ('status', 'ACTIVE'), ('shared', False), (extnet_apidef.EXTERNAL, True), (pnet.NETWORK_TYPE, net_type), (pnet.PHYSICAL_NETWORK, physical_network)] with self._create_l3_ext_network(physical_network) as net: for k, v in expected: self.assertEqual(net['network'][k], v) @common_v3.with_external_subnet def test_router_update_gateway_with_external_ip_used_by_gw(self): super(TestL3NatTestCase, self).test_router_update_gateway_with_external_ip_used_by_gw() @common_v3.with_external_subnet def test_router_update_gateway_with_invalid_external_ip(self): super(TestL3NatTestCase, self).test_router_update_gateway_with_invalid_external_ip() @common_v3.with_external_subnet def test_router_update_gateway_with_invalid_external_subnet(self): super(TestL3NatTestCase, self).test_router_update_gateway_with_invalid_external_subnet() @common_v3.with_external_network def test_router_update_gateway_with_different_external_subnet(self): super(TestL3NatTestCase, self).test_router_update_gateway_with_different_external_subnet() @common_v3.with_disable_dhcp def test_create_floatingip_ipv6_only_network_returns_400(self): super(TestL3NatTestCase, self).test_create_floatingip_ipv6_only_network_returns_400() @common_v3.with_disable_dhcp def test_create_floatingip_with_assoc_to_ipv4_and_ipv6_port(self): super(L3NatTest, self).test_create_floatingip_with_assoc_to_ipv4_and_ipv6_port() @common_v3.with_external_subnet_once def test_router_update_gateway_with_existed_floatingip(self): with self.subnet(cidr='20.0.0.0/24') as subnet: self._set_net_external(subnet['subnet']['network_id']) with self.floatingip_with_assoc() as fip: self._add_external_gateway_to_router( fip['floatingip']['router_id'], subnet['subnet']['network_id'], expected_code=exc.HTTPConflict.code) @common_v3.with_external_network def test_router_update_gateway_add_multiple_prefixes_ipv6(self): super(TestL3NatTestCase, self).test_router_update_gateway_add_multiple_prefixes_ipv6() @common_v3.with_external_network def test_router_concurrent_delete_upon_subnet_create(self): super(TestL3NatTestCase, self).test_router_concurrent_delete_upon_subnet_create() @common_v3.with_external_network def test_router_update_gateway_upon_subnet_create_ipv6(self): super(TestL3NatTestCase, self).test_router_update_gateway_upon_subnet_create_ipv6() @common_v3.with_external_subnet def test_router_add_gateway_dup_subnet2_returns_400(self): super(TestL3NatTestCase, self).test_router_add_gateway_dup_subnet2_returns_400() @common_v3.with_external_subnet def test_router_update_gateway(self): super(TestL3NatTestCase, self).test_router_update_gateway() @common_v3.with_external_subnet def test_router_create_with_gwinfo(self): super(TestL3NatTestCase, self).test_router_create_with_gwinfo() @common_v3.with_external_subnet def test_router_clear_gateway_callback_failure_returns_409(self): super(TestL3NatTestCase, self).test_router_clear_gateway_callback_failure_returns_409() @common_v3.with_external_subnet def test_router_create_with_gwinfo_ext_ip(self): super(TestL3NatTestCase, self).test_router_create_with_gwinfo_ext_ip() @common_v3.with_external_network def test_router_create_with_gwinfo_ext_ip_subnet(self): super(TestL3NatTestCase, self).test_router_create_with_gwinfo_ext_ip_subnet() @common_v3.with_external_subnet_second_time def test_router_delete_with_floatingip_existed_returns_409(self): super(TestL3NatTestCase, self).test_router_delete_with_floatingip_existed_returns_409() @common_v3.with_external_subnet def test_router_add_and_remove_gateway_tenant_ctx(self): super(TestL3NatTestCase, self).test_router_add_and_remove_gateway_tenant_ctx() @common_v3.with_external_subnet def test_router_add_and_remove_gateway(self): super(TestL3NatTestCase, self).test_router_add_and_remove_gateway() def test_router_set_gateway_cidr_overlapped_with_subnets(self): self.skipTest('2 dhcp subnets not supported') def test_router_update_gateway_upon_subnet_create_max_ips_ipv6(self): self.skipTest('not supported') def test_router_add_gateway_multiple_subnets_ipv6(self): self.skipTest('multiple ipv6 subnets not supported') def test__notify_gateway_port_ip_changed(self): self.skipTest('not supported') def test__notify_gateway_port_ip_not_changed(self): self.skipTest('not supported') def test_floatingip_via_router_interface_returns_201(self): self.skipTest('not supported') def test_floatingip_via_router_interface_returns_404(self): self.skipTest('not supported') def test_router_delete_dhcpv6_stateless_subnet_inuse_returns_409(self): self.skipTest('DHCPv6 not supported') def test_router_add_interface_ipv6_subnet(self): self.skipTest('DHCPv6 not supported') def test_update_router_interface_port_ipv6_subnet_ext_ra(self): self.skipTest('DHCPv6 not supported') @common_v3.with_disable_dhcp def test_router_add_interface_ipv6_subnet_without_gateway_ip(self): super(TestL3NatTestCase, self).test_router_add_interface_ipv6_subnet_without_gateway_ip() @common_v3.with_disable_dhcp def test_router_add_interface_multiple_ipv6_subnets_different_net(self): super(TestL3NatTestCase, self).\ test_router_add_interface_multiple_ipv6_subnets_different_net() @common_v3.with_disable_dhcp def test_create_floatingip_with_assoc_to_ipv6_subnet(self): super(TestL3NatTestCase, self).test_create_floatingip_with_assoc_to_ipv6_subnet() def test_router_add_iface_ipv6_ext_ra_subnet_returns_400(self): self.skipTest('DHCPv6 not supported') @common_v3.with_external_subnet def test_floatingip_list_with_sort(self): super(TestL3NatTestCase, self).test_floatingip_list_with_sort() @common_v3.with_external_subnet_once def test_floatingip_with_assoc_fails(self): super(TestL3NatTestCase, self).test_floatingip_with_assoc_fails() @common_v3.with_external_subnet_second_time def test_floatingip_update_same_fixed_ip_same_port(self): super(TestL3NatTestCase, self).test_floatingip_update_same_fixed_ip_same_port() @common_v3.with_external_subnet def test_floatingip_list_with_pagination_reverse(self): super(TestL3NatTestCase, self).test_floatingip_list_with_pagination_reverse() @common_v3.with_external_subnet_once def test_floatingip_association_on_unowned_router(self): super(TestL3NatTestCase, self).test_floatingip_association_on_unowned_router() @common_v3.with_external_network def test_delete_ext_net_with_disassociated_floating_ips(self): super(TestL3NatTestCase, self).test_delete_ext_net_with_disassociated_floating_ips() @common_v3.with_external_network def test_create_floatingip_with_subnet_and_invalid_fip_address(self): super( TestL3NatTestCase, self).test_create_floatingip_with_subnet_and_invalid_fip_address() @common_v3.with_external_subnet def test_create_floatingip_with_duplicated_specific_ip(self): super(TestL3NatTestCase, self).test_create_floatingip_with_duplicated_specific_ip() @common_v3.with_external_subnet def test_create_floatingip_with_subnet_id_non_admin(self): super(TestL3NatTestCase, self).test_create_floatingip_with_subnet_id_non_admin() @common_v3.with_external_subnet def test_floatingip_list_with_pagination(self): super(TestL3NatTestCase, self).test_floatingip_list_with_pagination() @common_v3.with_external_subnet def test_create_floatingips_native_quotas(self): super(TestL3NatTestCase, self).test_create_floatingips_native_quotas() @common_v3.with_external_network def test_create_floatingip_with_multisubnet_id(self): super(TestL3NatTestCase, self).test_create_floatingip_with_multisubnet_id() @common_v3.with_external_network def test_create_floatingip_with_subnet_id_and_fip_address(self): super(TestL3NatTestCase, self).test_create_floatingip_with_subnet_id_and_fip_address() @common_v3.with_external_subnet def test_create_floatingip_with_specific_ip(self): super(TestL3NatTestCase, self).test_create_floatingip_with_specific_ip() @common_v3.with_external_network def test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4(self): super(TestL3NatTestCase, self).test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4() @common_v3.with_external_subnet_once def test_create_floatingip_non_admin_context_agent_notification(self): super( TestL3NatTestCase, self).test_create_floatingip_non_admin_context_agent_notification() @common_v3.with_external_subnet def test_create_floatingip_no_ext_gateway_return_404(self): super(TestL3NatTestCase, self).test_create_floatingip_no_ext_gateway_return_404() @common_v3.with_external_subnet def test_create_floatingip_with_specific_ip_out_of_allocation(self): super(TestL3NatTestCase, self).test_create_floatingip_with_specific_ip_out_of_allocation() @common_v3.with_external_subnet_third_time def test_floatingip_update_different_router(self): super(TestL3NatTestCase, self).test_floatingip_update_different_router() def test_router_add_gateway_notifications(self): with self.router() as r,\ self._create_l3_ext_network() as ext_net,\ self.subnet(network=ext_net): with mock.patch.object(registry, 'publish') as publish: self._add_external_gateway_to_router( r['router']['id'], ext_net['network']['id']) expected = [mock.call( resources.ROUTER_GATEWAY, events.AFTER_CREATE, mock.ANY, payload=mock.ANY)] publish.assert_has_calls(expected) def test_create_l3_ext_network_with_default_tier0(self): self._test_create_l3_ext_network() def test_floatingip_update(self): super(TestL3NatTestCase, self).test_floatingip_update( expected_status=constants.FLOATINGIP_STATUS_DOWN) @common_v3.with_external_subnet_second_time def test_floatingip_with_invalid_create_port(self): self._test_floatingip_with_invalid_create_port(self._plugin_name) def test_network_update_external(self): # This plugin does not support updating the external flag of a network self.skipTest('not supported') def test_network_update_external_failure(self): # This plugin does not support updating the external flag of a network # This is tested with a different test self.skipTest('not supported') def test_router_add_gateway_dup_subnet1_returns_400(self): self.skipTest('not supported') def test_router_add_interface_dup_subnet2_returns_400(self): self.skipTest('not supported') def test_router_add_interface_ipv6_port_existing_network_returns_400(self): self.skipTest('multiple ipv6 subnets not supported') def test_routes_update_for_multiple_routers(self): self.skipTest('not supported') def test_floatingip_multi_external_one_internal(self): self.skipTest('not supported') def test_floatingip_same_external_and_internal(self): self.skipTest('not supported') def test_route_update_with_external_route(self): self.skipTest('not supported') def test_floatingip_update_subnet_gateway_disabled(self): self.skipTest('not supported') def test_router_add_interface_by_port_other_tenant_address_out_of_pool( self): # multiple fixed ips per port are not supported self.skipTest('not supported') def test_router_add_interface_by_port_other_tenant_address_in_pool(self): # multiple fixed ips per port are not supported self.skipTest('not supported') def test_router_add_interface_by_port_admin_address_out_of_pool(self): # multiple fixed ips per port are not supported self.skipTest('not supported') def test_router_delete_with_lb_service(self): self.lb_mock1.stop() self.lb_mock2.stop() # Create the LB object - here the delete callback is registered loadbalancer = loadbalancer_mgr.EdgeLoadBalancerManagerFromDict() oct_listener = octavia_listener.NSXOctaviaListenerEndpoint( loadbalancer=loadbalancer) with self.router() as router: with mock.patch('vmware_nsxlib.v3.load_balancer.Service.' 'get_router_lb_service'),\ mock.patch('vmware_nsx.db.db.get_nsx_router_id', return_value='1'),\ mock.patch.object( nsx_db, 'has_nsx_lbaas_loadbalancer_binding_by_router', return_value=True): self.assertRaises(nc_exc.CallbackFailure, self.plugin_instance.delete_router, context.get_admin_context(), router['router']['id']) # Unregister callback oct_listener._unsubscribe_router_delete_callback() self.lb_mock1.start() self.lb_mock2.start() def test_multiple_subnets_on_different_routers(self): with self.network() as network: with self.subnet(network=network) as s1,\ self.subnet(network=network, cidr='11.0.0.0/24') as s2,\ self.router() as r1,\ self.router() as r2: self._router_interface_action('add', r1['router']['id'], s1['subnet']['id'], None) self.assertRaises(n_exc.Conflict, self.plugin_instance.add_router_interface, context.get_admin_context(), r2['router']['id'], {'subnet_id': s2['subnet']['id']}) self._router_interface_action('remove', r1['router']['id'], s1['subnet']['id'], None) self._router_interface_action('add', r2['router']['id'], s2['subnet']['id'], None) self._router_interface_action('remove', r2['router']['id'], s2['subnet']['id'], None) def test_multiple_subnets_on_same_router(self): with self.network() as network: with self.subnet(network=network) as s1,\ self.subnet(network=network, cidr='11.0.0.0/24') as s2,\ self.router() as r1: self._router_interface_action('add', r1['router']['id'], s1['subnet']['id'], None) self.assertRaises(n_exc.InvalidInput, self.plugin_instance.add_router_interface, context.get_admin_context(), r1['router']['id'], {'subnet_id': s2['subnet']['id']}) self._router_interface_action('remove', r1['router']['id'], s1['subnet']['id'], None) def test_router_remove_interface_inuse_return_409(self): with self.router() as r1,\ self._create_l3_ext_network() as ext_net,\ self.subnet(network=ext_net) as ext_subnet,\ self.subnet(cidr='11.0.0.0/24') as s1: self._router_interface_action( 'add', r1['router']['id'], s1['subnet']['id'], None) self._add_external_gateway_to_router( r1['router']['id'], ext_subnet['subnet']['network_id']) with self.port(subnet=s1,) as p: fip_res = self._create_floatingip( self.fmt, ext_subnet['subnet']['network_id'], subnet_id=ext_subnet['subnet']['id'], port_id=p['port']['id']) fip = self.deserialize(self.fmt, fip_res) self._router_interface_action( 'remove', r1['router']['id'], s1['subnet']['id'], None, expected_code=exc.HTTPConflict.code) self._delete('floatingips', fip['floatingip']['id']) self._remove_external_gateway_from_router( r1['router']['id'], ext_subnet['subnet']['network_id']) self._router_interface_action('remove', r1['router']['id'], s1['subnet']['id'], None) def test_router_update_on_external_port(self): with self.router() as r: with self._create_l3_ext_network() as ext_net,\ self.subnet(network=ext_net, cidr='10.0.1.0/24') as s: self._add_external_gateway_to_router( r['router']['id'], s['subnet']['network_id']) body = self._show('routers', r['router']['id']) net_id = body['router']['external_gateway_info']['network_id'] self.assertEqual(net_id, s['subnet']['network_id']) port_res = self._list_ports( 'json', 200, s['subnet']['network_id'], tenant_id=r['router']['tenant_id'], device_owner=constants.DEVICE_OWNER_ROUTER_GW) port_list = self.deserialize('json', port_res) self.assertEqual(len(port_list['ports']), 1) routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}] self.assertRaises(n_exc.InvalidInput, self.plugin_instance.update_router, context.get_admin_context(), r['router']['id'], {'router': {'routes': routes}}) updates = {'admin_state_up': False} self.assertRaises(n_exc.InvalidInput, self.plugin_instance.update_router, context.get_admin_context(), r['router']['id'], {'router': updates}) self._remove_external_gateway_from_router( r['router']['id'], s['subnet']['network_id']) body = self._show('routers', r['router']['id']) gw_info = body['router']['external_gateway_info'] self.assertIsNone(gw_info) def test_router_on_vlan_net(self): providernet_args = {pnet.NETWORK_TYPE: 'vlan', pnet.SEGMENTATION_ID: 10} with mock.patch('vmware_nsxlib.v3.core_resources.NsxLibTransportZone.' 'get_transport_type', return_value='VLAN'): result = self._create_network(fmt='json', name='badvlan_net', admin_state_up=True, providernet_args=providernet_args, arg_list=( pnet.NETWORK_TYPE, pnet.SEGMENTATION_ID)) vlan_network = self.deserialize('json', result) with self.router() as r1,\ self._create_l3_ext_network() as ext_net,\ self.subnet(network=ext_net) as ext_subnet,\ self.subnet(cidr='11.0.0.0/24', network=vlan_network) as s1: # adding a vlan interface with no GW should fail self._router_interface_action( 'add', r1['router']['id'], s1['subnet']['id'], None, expected_code=400) # adding GW self._add_external_gateway_to_router( r1['router']['id'], ext_subnet['subnet']['network_id']) # adding the vlan interface self._router_interface_action( 'add', r1['router']['id'], s1['subnet']['id'], None) # adding a floating ip with self.port(subnet=s1) as p: fip_res = self._create_floatingip( self.fmt, ext_subnet['subnet']['network_id'], subnet_id=ext_subnet['subnet']['id'], port_id=p['port']['id']) fip = self.deserialize(self.fmt, fip_res) self.assertEqual(p['port']['id'], fip['floatingip']['port_id']) def test_create_router_gateway_fails(self): self.skipTest('not supported') def test_router_remove_ipv6_subnet_from_interface(self): self.skipTest('not supported') def test_router_add_interface_multiple_ipv6_subnets_same_net(self): self.skipTest('not supported') def test_router_add_interface_multiple_ipv4_subnets(self): self.skipTest('not supported') def test_floatingip_update_to_same_port_id_twice(self): self.skipTest('Plugin changes floating port status') def _test_create_subnetpool(self, prefixes, expected=None, admin=False, **kwargs): keys = kwargs.copy() keys.setdefault('tenant_id', self._tenant_id) with self.subnetpool(prefixes, admin, **keys) as subnetpool: self._validate_resource(subnetpool, keys, 'subnetpool') if expected: self._compare_resource(subnetpool, expected, 'subnetpool') return subnetpool def _update_router_enable_snat(self, router_id, network_id, enable_snat): return self._update('routers', router_id, {'router': {'external_gateway_info': {'network_id': network_id, 'enable_snat': enable_snat}}}) def test_router_no_snat_with_different_address_scope(self): """Test that if the router has no snat, you cannot add an interface from a different address scope than the gateway. """ # create an external network on one address scope with self.address_scope(name='as1') as addr_scope, \ self._create_l3_ext_network() as ext_net: as_id = addr_scope['address_scope']['id'] subnet = netaddr.IPNetwork('10.10.10.0/24') subnetpool = self._test_create_subnetpool( [subnet.cidr], name='sp1', min_prefixlen='24', address_scope_id=as_id) subnetpool_id = subnetpool['subnetpool']['id'] data = {'subnet': { 'network_id': ext_net['network']['id'], 'subnetpool_id': subnetpool_id, 'ip_version': 4, 'enable_dhcp': False, 'tenant_id': ext_net['network']['tenant_id']}} req = self.new_create_request('subnets', data) ext_subnet = self.deserialize(self.fmt, req.get_response(self.api)) # create a regular network on another address scope with self.address_scope(name='as2') as addr_scope2, \ self.network() as net: as_id2 = addr_scope2['address_scope']['id'] subnet2 = netaddr.IPNetwork('20.10.10.0/24') subnetpool2 = self._test_create_subnetpool( [subnet2.cidr], name='sp2', min_prefixlen='24', address_scope_id=as_id2) subnetpool_id2 = subnetpool2['subnetpool']['id'] data = {'subnet': { 'network_id': net['network']['id'], 'subnetpool_id': subnetpool_id2, 'ip_version': 4, 'tenant_id': net['network']['tenant_id']}} req = self.new_create_request('subnets', data) int_subnet = self.deserialize( self.fmt, req.get_response(self.api)) # create a no snat router with this gateway with self.router() as r: self._add_external_gateway_to_router( r['router']['id'], ext_subnet['subnet']['network_id']) self._update_router_enable_snat( r['router']['id'], ext_subnet['subnet']['network_id'], False) # should fail adding the interface to the router err_code = exc.HTTPBadRequest.code self._router_interface_action('add', r['router']['id'], int_subnet['subnet']['id'], None, err_code) def test_router_no_snat_with_same_address_scope(self): """Test that if the router has no snat, you can add an interface from the same address scope as the gateway. """ # create an external network on one address scope with self.address_scope(name='as1') as addr_scope, \ self._create_l3_ext_network() as ext_net: as_id = addr_scope['address_scope']['id'] subnet = netaddr.IPNetwork('10.10.10.0/21') subnetpool = self._test_create_subnetpool( [subnet.cidr], name='sp1', min_prefixlen='24', address_scope_id=as_id) subnetpool_id = subnetpool['subnetpool']['id'] data = {'subnet': { 'network_id': ext_net['network']['id'], 'subnetpool_id': subnetpool_id, 'ip_version': 4, 'enable_dhcp': False, 'tenant_id': ext_net['network']['tenant_id']}} req = self.new_create_request('subnets', data) ext_subnet = self.deserialize(self.fmt, req.get_response(self.api)) # create a regular network on the same address scope with self.network() as net: data = {'subnet': { 'network_id': net['network']['id'], 'subnetpool_id': subnetpool_id, 'ip_version': 4, 'tenant_id': net['network']['tenant_id']}} req = self.new_create_request('subnets', data) int_subnet = self.deserialize( self.fmt, req.get_response(self.api)) # create a no snat router with this gateway with self.router() as r: self._add_external_gateway_to_router( r['router']['id'], ext_subnet['subnet']['network_id']) self._update_router_enable_snat( r['router']['id'], ext_subnet['subnet']['network_id'], False) # should succeed adding the interface to the router self._router_interface_action('add', r['router']['id'], int_subnet['subnet']['id'], None) def _mock_add_snat_rule(self): return mock.patch("vmware_nsxlib.v3.router.RouterLib." "add_gw_snat_rule") def _mock_add_remove_service_router(self): return mock.patch("vmware_nsxlib.v3.core_resources." "NsxLibLogicalRouter.update") def _mock_del_snat_rule(self): return mock.patch("vmware_nsxlib.v3.router.RouterLib." "delete_gw_snat_rule_by_source") def _prepare_external_subnet_on_address_scope(self, ext_net, addr_scope): as_id = addr_scope['address_scope']['id'] subnet = netaddr.IPNetwork('10.10.10.0/21') subnetpool = self._test_create_subnetpool( [subnet.cidr], name='sp1', min_prefixlen='24', address_scope_id=as_id) subnetpool_id = subnetpool['subnetpool']['id'] data = {'subnet': { 'network_id': ext_net['network']['id'], 'subnetpool_id': subnetpool_id, 'ip_version': 4, 'enable_dhcp': False, 'tenant_id': ext_net['network']['tenant_id']}} req = self.new_create_request('subnets', data) ext_subnet = self.deserialize(self.fmt, req.get_response(self.api)) return ext_subnet['subnet'] def _create_subnet_and_assert_snat_rules(self, subnetpool_id, router_id, assert_snat_deleted=False, assert_snat_added=False): # create a regular network on the given subnet pool with self.network() as net: data = {'subnet': { 'network_id': net['network']['id'], 'subnetpool_id': subnetpool_id, 'ip_version': 4, 'tenant_id': net['network']['tenant_id']}} req = self.new_create_request('subnets', data) int_subnet = self.deserialize( self.fmt, req.get_response(self.api)) with self._mock_add_snat_rule() as add_nat,\ self._mock_del_snat_rule() as delete_nat: # Add the interface self._router_interface_action( 'add', router_id, int_subnet['subnet']['id'], None) if assert_snat_deleted: delete_nat.assert_called() else: delete_nat.assert_not_called() if assert_snat_added: add_nat.assert_called() else: add_nat.assert_not_called() def test_add_service_router_enable_snat(self): with self.address_scope(name='as1') as addr_scope, \ self._create_l3_ext_network() as ext_net: ext_subnet = self._prepare_external_subnet_on_address_scope( ext_net, addr_scope) # create a router with this gateway with self.router() as r, \ mock.patch("vmware_nsxlib.v3.router.RouterLib." "has_service_router", return_value=False),\ self._mock_add_remove_service_router() as change_sr: router_id = r['router']['id'] self._add_external_gateway_to_router( router_id, ext_subnet['network_id']) # Checking that router update is being called with # edge_cluster_uuid, for creating a service router change_sr.assert_any_call( mock.ANY, edge_cluster_id=NSX_EDGE_CLUSTER_UUID, enable_standby_relocation=True) def test_remove_service_router_disable_snat(self): with self.address_scope(name='as1') as addr_scope, \ self._create_l3_ext_network() as ext_net: ext_subnet = self._prepare_external_subnet_on_address_scope( ext_net, addr_scope) # create a router with this gateway, disable snat with self.router() as r: self._add_external_gateway_to_router( r['router']['id'], ext_subnet['network_id']) with mock.patch("vmware_nsxlib.v3.router.RouterLib." "has_service_router", return_value=True),\ self._mock_add_remove_service_router() as change_sr: self._update_router_enable_snat( r['router']['id'], ext_subnet['network_id'], False) # Checking that router update is being called # and setting edge_cluster_uuid to None, for service # router removal. change_sr.assert_called_once_with( mock.ANY, edge_cluster_id=None, enable_standby_relocation=False) def test_router_address_scope_snat_rules(self): """Test that if the router interface had the same address scope as the gateway - snat rule is not added. """ # create an external network on one address scope with self.address_scope(name='as1') as addr_scope, \ self._create_l3_ext_network() as ext_net: ext_subnet = self._prepare_external_subnet_on_address_scope( ext_net, addr_scope) # create a router with this gateway with self.router() as r: self._add_external_gateway_to_router( r['router']['id'], ext_subnet['network_id']) # create a regular network on same address scope # and verify no snat change as_id = addr_scope['address_scope']['id'] subnet = netaddr.IPNetwork('30.10.10.0/24') subnetpool = self._test_create_subnetpool( [subnet.cidr], name='sp2', min_prefixlen='24', address_scope_id=as_id) as_id = addr_scope['address_scope']['id'] subnetpool_id = subnetpool['subnetpool']['id'] self._create_subnet_and_assert_snat_rules( subnetpool_id, r['router']['id']) # create a regular network on a different address scope # and verify snat rules are added with self.address_scope(name='as2') as addr_scope2: as2_id = addr_scope2['address_scope']['id'] subnet2 = netaddr.IPNetwork('20.10.10.0/24') subnetpool2 = self._test_create_subnetpool( [subnet2.cidr], name='sp2', min_prefixlen='24', address_scope_id=as2_id) subnetpool2_id = subnetpool2['subnetpool']['id'] self._create_subnet_and_assert_snat_rules( subnetpool2_id, r['router']['id'], assert_snat_added=True) def _test_router_address_scope_change(self, change_gw=False): """When subnetpool address scope changes, and router that was originally under same address scope, results having different address scopes, relevant snat rules are added. """ # create an external network on one address scope with self.address_scope(name='as1') as addr_scope, \ self._create_l3_ext_network() as ext_net: ext_subnet = self._prepare_external_subnet_on_address_scope( ext_net, addr_scope) # create a router with this gateway with self.router() as r: self._add_external_gateway_to_router( r['router']['id'], ext_subnet['network_id']) # create a regular network on same address scope # and verify no snat change as_id = addr_scope['address_scope']['id'] subnet2 = netaddr.IPNetwork('40.10.10.0/24') subnetpool2 = self._test_create_subnetpool( [subnet2.cidr], name='sp2', min_prefixlen='24', address_scope_id=as_id) subnetpool2_id = subnetpool2['subnetpool']['id'] self._create_subnet_and_assert_snat_rules( subnetpool2_id, r['router']['id']) # change address scope of the first subnetpool with self.address_scope(name='as2') as addr_scope2,\ self._mock_add_snat_rule() as add_nat: as2_id = addr_scope2['address_scope']['id'] data = {'subnetpool': { 'address_scope_id': as2_id}} if change_gw: subnetpool_to_update = ext_subnet['subnetpool_id'] else: subnetpool_to_update = subnetpool2_id req = self.new_update_request('subnetpools', data, subnetpool_to_update) req.get_response(self.api) add_nat.assert_called_once() def test_router_address_scope_change(self): self._test_router_address_scope_change() def test_router_address_scope_gw_change(self): self._test_router_address_scope_change(change_gw=True) def _test_3leg_router_address_scope_change(self, change_gw=False, change_2gw=False): """Test address scope change scenarios with router that covers 3 address scopes """ # create an external network on one address scope with self.address_scope(name='as1') as as1, \ self.address_scope(name='as2') as as2, \ self.address_scope(name='as3') as as3, \ self._create_l3_ext_network() as ext_net: ext_subnet = self._prepare_external_subnet_on_address_scope( ext_net, as1) as1_id = as1['address_scope']['id'] # create a router with this gateway with self.router() as r: self._add_external_gateway_to_router( r['router']['id'], ext_subnet['network_id']) # create a regular network on address scope 2 # and verify snat change as2_id = as2['address_scope']['id'] subnet2 = netaddr.IPNetwork('20.10.10.0/24') subnetpool2 = self._test_create_subnetpool( [subnet2.cidr], name='sp2', min_prefixlen='24', address_scope_id=as2_id) subnetpool2_id = subnetpool2['subnetpool']['id'] self._create_subnet_and_assert_snat_rules( subnetpool2_id, r['router']['id'], assert_snat_added=True) # create a regular network on address scope 3 # verify no snat change as3_id = as3['address_scope']['id'] subnet3 = netaddr.IPNetwork('30.10.10.0/24') subnetpool3 = self._test_create_subnetpool( [subnet3.cidr], name='sp2', min_prefixlen='24', address_scope_id=as3_id) subnetpool3_id = subnetpool3['subnetpool']['id'] self._create_subnet_and_assert_snat_rules( subnetpool3_id, r['router']['id'], assert_snat_added=True) with self._mock_add_snat_rule() as add_nat, \ self._mock_del_snat_rule() as del_nat: if change_gw: # change address scope of GW subnet subnetpool_to_update = ext_subnet['subnetpool_id'] else: subnetpool_to_update = subnetpool2_id if change_2gw: # change subnet2 to be in GW address scope target_as = as1_id else: target_as = as3_id data = {'subnetpool': { 'address_scope_id': target_as}} req = self.new_update_request('subnetpools', data, subnetpool_to_update) req.get_response(self.api) if change_gw: # The test changed address scope of gw subnet. # Both previous rules should be deleted, # and one new rule for subnet2 should be added del_nat.assert_called() self.assertEqual(2, del_nat.call_count) add_nat.assert_called_once() else: if change_2gw: # The test changed address scope of subnet2 to be # same as GW address scope. # Snat rule for as2 will be deleted. No effect on as3 # rule. del_nat.assert_called_once() else: # The test changed address scope of subnet2 to # as3. Affected snat rule should be re-created. del_nat.assert_called_once() add_nat.assert_called_once() def test_3leg_router_address_scope_change(self): self._test_3leg_router_address_scope_change() def test_3leg_router_address_scope_change_to_gw(self): self._test_3leg_router_address_scope_change(change_2gw=True) def test_3leg_router_gw_address_scope_change(self): self._test_3leg_router_address_scope_change(change_gw=True) def test_subnetpool_router_address_scope_change_no_effect(self): """When all router interfaces are allocated from same subnetpool, changing address scope on this subnetpool should not affect snat rules. """ # create an external network on one address scope with self.address_scope(name='as1') as addr_scope, \ self._create_l3_ext_network() as ext_net: ext_subnet = self._prepare_external_subnet_on_address_scope( ext_net, addr_scope) # create a router with this gateway with self.router() as r: self._add_external_gateway_to_router( r['router']['id'], ext_subnet['network_id']) # create a regular network on same address scope # and verify no snat change self._create_subnet_and_assert_snat_rules( ext_subnet['subnetpool_id'], r['router']['id']) with self.address_scope(name='as2') as addr_scope2,\ self._mock_add_snat_rule() as add_nat,\ self._mock_del_snat_rule() as delete_nat: as2_id = addr_scope2['address_scope']['id'] # change address scope of the subnetpool data = {'subnetpool': { 'address_scope_id': as2_id}} req = self.new_update_request('subnetpools', data, ext_subnet['subnetpool_id']) req.get_response(self.api) add_nat.assert_not_called() delete_nat.assert_not_called() def test_router_admin_state(self): """It is not allowed to set the router admin-state to down""" with self.router() as r: self._update('routers', r['router']['id'], {'router': {'admin_state_up': False}}, expected_code=exc.HTTPBadRequest.code) def test_router_dhcp_relay_dhcp_enabled(self): """Verify that the relay service is added to the router interface""" self._enable_dhcp_relay() with self.network() as network: with mock.patch.object(self.plugin, 'validate_router_dhcp_relay'),\ self.subnet(network=network, enable_dhcp=True) as s1,\ self.router() as r1,\ mock.patch.object(self.plugin.nsxlib.logical_router_port, 'update') as mock_update_port: self._router_interface_action('add', r1['router']['id'], s1['subnet']['id'], None) mock_update_port.assert_called_once_with( mock.ANY, relay_service_uuid=NSX_DHCP_RELAY_SRV, subnets=mock.ANY) def test_router_dhcp_relay_dhcp_disabled(self): """Verify that the relay service is not added to the router interface If the subnet do not have enabled dhcp """ self._enable_dhcp_relay() with self.network() as network: with mock.patch.object(self.plugin, 'validate_router_dhcp_relay'),\ self.subnet(network=network, enable_dhcp=False) as s1,\ self.router() as r1,\ mock.patch.object(self.plugin.nsxlib.logical_router_port, 'update') as mock_update_port: self._router_interface_action('add', r1['router']['id'], s1['subnet']['id'], None) mock_update_port.assert_called_once_with( mock.ANY, relay_service_uuid=None, subnets=mock.ANY) def test_router_dhcp_relay_no_ipam(self): """Verify that a router cannot be created with relay and no ipam""" # Add the relay service to the config and availability zones self._enable_dhcp_relay() self.assertRaises(n_exc.InvalidInput, self.plugin_instance.create_router, context.get_admin_context(), {'router': {'name': 'rtr'}}) def test_router_add_gateway_no_subnet_forbidden(self): with self.router() as r: with self._create_l3_ext_network() as n: self._add_external_gateway_to_router( r['router']['id'], n['network']['id'], expected_code=exc.HTTPBadRequest.code) def test_router_add_gateway_no_subnet(self): self.skipTest('No support for no subnet gateway set') @mock.patch.object(nsx_plugin.NsxV3Plugin, 'validate_availability_zones') def test_create_router_with_availability_zone(self, mock_validate_az): name = 'rtr-with-zone' zone = ['zone1'] mock_validate_az.return_value = None with self.router(name=name, availability_zone_hints=zone) as rtr: az_hints = rtr['router']['availability_zone_hints'] self.assertListEqual(zone, az_hints) def _test_route_update_illegal(self, destination): routes = [{'destination': destination, 'nexthop': '10.0.1.3'}] with self.router() as r: with self.subnet(cidr='10.0.1.0/24') as s: fixed_ip_data = [{'ip_address': '10.0.1.2'}] with self.port(subnet=s, fixed_ips=fixed_ip_data) as p: self._router_interface_action( 'add', r['router']['id'], None, p['port']['id']) self._update('routers', r['router']['id'], {'router': {'routes': routes}}, expected_code=400) def test_route_update_illegal(self): self._test_route_update_illegal('0.0.0.0/0') self._test_route_update_illegal('0.0.0.0/16') def test_update_router_distinct_edge_cluster(self): self.mock_get_edge_cluster.stop() edge_cluster = uuidutils.generate_uuid() mock.patch( "vmware_nsxlib.v3.core_resources.NsxLibEdgeCluster." "get_id_by_name_or_id", return_value=edge_cluster).start() cfg.CONF.set_override('edge_cluster', edge_cluster, 'nsx_v3') self._initialize_azs() with self.address_scope(name='as1') as addr_scope, \ self._create_l3_ext_network() as ext_net: ext_subnet = self._prepare_external_subnet_on_address_scope( ext_net, addr_scope) # create a router with this gateway with self.router() as r, \ mock.patch("vmware_nsxlib.v3.router.RouterLib." "has_service_router", return_value=False),\ self._mock_add_remove_service_router() as change_sr: router_id = r['router']['id'] self._add_external_gateway_to_router( router_id, ext_subnet['network_id']) change_sr.assert_any_call( mock.ANY, edge_cluster_id=edge_cluster, enable_standby_relocation=True) self.mock_get_edge_cluster.start() def test_router_add_interface_cidr_overlapped_with_gateway(self): with self.router() as r,\ self._create_l3_ext_network() as ext_net,\ self.subnet(cidr='10.0.1.0/24') as s1,\ self.subnet(network=ext_net, cidr='10.0.0.0/16', enable_dhcp=False) as s2: self._add_external_gateway_to_router( r['router']['id'], s2['subnet']['network_id']) res = self._router_interface_action( 'add', r['router']['id'], s1['subnet']['id'], None, expected_code=exc.HTTPBadRequest.code) self.assertIn('NeutronError', res) def test_router_add_gateway_overlapped_with_interface_cidr(self): with self.router() as r,\ self._create_l3_ext_network() as ext_net,\ self.subnet(cidr='10.0.1.0/24') as s1,\ self.subnet(network=ext_net, cidr='10.0.0.0/16', enable_dhcp=False) as s2: self._router_interface_action( 'add', r['router']['id'], s1['subnet']['id'], None) res = self._add_external_gateway_to_router( r['router']['id'], s2['subnet']['network_id'], expected_code=exc.HTTPBadRequest.code) self.assertIn('NeutronError', res) def test_router_add_interface_by_port_cidr_overlapped_with_gateway(self): with self.router() as r,\ self._create_l3_ext_network() as ext_net,\ self.subnet(cidr='10.0.1.0/24') as s1,\ self.subnet(network=ext_net, cidr='10.0.0.0/16', enable_dhcp=False) as s2,\ self.port(subnet=s1) as p: self._add_external_gateway_to_router( r['router']['id'], s2['subnet']['network_id']) res = self._router_interface_action( 'add', r['router']['id'], None, p['port']['id'], expected_code=exc.HTTPBadRequest.code) self.assertIn('NeutronError', res) def test_create_floatingip_invalid_fixed_ipv6_address_returns_400(self): self.skipTest('Failed because of illegal port id') def test_create_floatingip_with_router_interface_device_owner_fail(self): # This tests that an error is raised when trying to assign a router # interface port with floatingip. with self.subnet(cidr='30.0.0.0/24', gateway_ip=None) as private_sub: with self.port( subnet=private_sub, device_owner=constants.DEVICE_OWNER_ROUTER_INTF) as p: port_id = p['port']['id'] with self.router() as r: self._router_interface_action('add', r['router']['id'], None, port_id) with self.external_network() as public_net, self.subnet( network=public_net, cidr='12.0.0.0/24') as public_sub: self._add_external_gateway_to_router( r['router']['id'], public_sub['subnet']['network_id']) self._make_floatingip( self.fmt, public_sub['subnet']['network_id'], port_id=port_id, http_status=exc.HTTPBadRequest.code) def test_assign_floatingip_to_router_interface_device_owner_fail(self): # This tests that an error is raised when trying to assign a router # interface port with floatingip. with self.subnet(cidr='30.0.0.0/24', gateway_ip=None) as private_sub: with self.port( subnet=private_sub, device_owner=constants.DEVICE_OWNER_ROUTER_INTF) as p: port_id = p['port']['id'] with self.router() as r: self._router_interface_action('add', r['router']['id'], None, port_id) with self.external_network() as public_net, self.subnet( network=public_net, cidr='12.0.0.0/24') as public_sub: self._add_external_gateway_to_router( r['router']['id'], public_sub['subnet']['network_id']) fip = self._make_floatingip(self.fmt, public_sub[ 'subnet']['network_id']) self._update('floatingips', fip['floatingip'][ 'id'], {'floatingip': {'port_id': port_id}}, expected_code=exc.HTTPBadRequest.code) class ExtGwModeTestCase(test_ext_gw_mode.ExtGwModeIntTestCase, L3NatTest): def test_router_gateway_set_fail_after_port_create(self): self.skipTest("TBD") @common_v3.with_external_subnet def _test_router_update_ext_gwinfo(self, snat_input_value, snat_expected_value=False, expected_http_code=exc.HTTPOk.code): return super(ExtGwModeTestCase, self)._test_router_update_ext_gwinfo( snat_input_value, snat_expected_value=snat_expected_value, expected_http_code=expected_http_code) @common_v3.with_external_subnet def test_router_gateway_set_retry(self): super(ExtGwModeTestCase, self).test_router_gateway_set_retry() @common_v3.with_external_subnet def _test_router_create_show_ext_gwinfo(self, *args, **kwargs): return super(ExtGwModeTestCase, self)._test_router_create_show_ext_gwinfo(*args, **kwargs)