A set of Neutron drivers for the VMware NSX.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

4629 lines
221 KiB

# Copyright (c) 2012 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
from eventlet import greenthread
import mock
import netaddr
from neutron.api.rpc.callbacks import events as callbacks_events
from neutron.api.v2 import attributes
from neutron.common import utils
from neutron import context
from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import dvr as dist_router
from neutron.extensions import external_net
from neutron.extensions import l3
from neutron.extensions import l3_ext_gw_mode
from neutron.extensions import portbindings
from neutron.extensions import providernet as pnet
from neutron.extensions import securitygroup as secgrp
from neutron import manager
from neutron.objects.qos import policy as qos_pol
from neutron.plugins.common import constants as plugin_const
from neutron.services.qos import qos_consts
from neutron.tests.unit import _test_extension_portbindings as test_bindings
import neutron.tests.unit.db.test_allowedaddresspairs_db as test_addr_pair
import neutron.tests.unit.db.test_db_base_plugin_v2 as test_plugin
import neutron.tests.unit.extensions.test_l3 as test_l3_plugin
import neutron.tests.unit.extensions.test_l3_ext_gw_mode as test_ext_gw_mode
import neutron.tests.unit.extensions.test_portsecurity as test_psec
import neutron.tests.unit.extensions.test_securitygroup as ext_sg
from neutron.tests.unit import testlib_api
from neutron_lib.api import validators
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
from oslo_utils import uuidutils
import six
import webob.exc
from vmware_nsx._i18n import _
from vmware_nsx.common import exceptions as nsxv_exc
from vmware_nsx.common import nsx_constants
from vmware_nsx.common import utils as c_utils
from vmware_nsx.db import nsxv_db
from vmware_nsx.dvs import dvs
from vmware_nsx.dvs import dvs_utils
from vmware_nsx.extensions import routersize as router_size
from vmware_nsx.extensions import routertype as router_type
from vmware_nsx.extensions import securitygrouplogging
from vmware_nsx.extensions import vnicindex as ext_vnic_idx
from vmware_nsx.plugins.nsx_v import availability_zones as nsx_az
from vmware_nsx.plugins.nsx_v.drivers import (
exclusive_router_driver as ex_router_driver)
from vmware_nsx.plugins.nsx_v.drivers import (
shared_router_driver as router_driver)
from vmware_nsx.plugins.nsx_v import md_proxy
from vmware_nsx.plugins.nsx_v.vshield.common import constants as vcns_const
from vmware_nsx.plugins.nsx_v.vshield import edge_appliance_driver
from vmware_nsx.plugins.nsx_v.vshield import edge_firewall_driver
from vmware_nsx.plugins.nsx_v.vshield import edge_utils
from vmware_nsx.plugins.nsx_v.vshield.tasks import (
constants as task_constants)
from vmware_nsx.services.qos.nsx_v import utils as qos_utils
from vmware_nsx.tests import unit as vmware
from vmware_nsx.tests.unit.extensions import test_vnic_index
from vmware_nsx.tests.unit.nsx_v.vshield import fake_vcns
from vmware_nsx.tests.unit import test_utils
PLUGIN_NAME = 'vmware_nsx.plugin.NsxVPlugin'
_uuid = uuidutils.generate_uuid
class NsxVPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
def _create_network(self, fmt, name, admin_state_up,
arg_list=None, providernet_args=None,
set_context=False, tenant_id=None,
**kwargs):
tenant_id = tenant_id or self._tenant_id
data = {'network': {'name': name,
'admin_state_up': admin_state_up,
'tenant_id': tenant_id}}
# Fix to allow the router:external attribute and any other
# attributes containing a colon to be passed with
# a double underscore instead
kwargs = dict((k.replace('__', ':'), v) for k, v in kwargs.items())
if external_net.EXTERNAL in kwargs:
arg_list = (external_net.EXTERNAL, ) + (arg_list or ())
attrs = kwargs
if providernet_args:
attrs.update(providernet_args)
for arg in (('admin_state_up', 'tenant_id', 'shared') +
(arg_list or ())):
# Arg must be present and not empty
if arg in kwargs:
data['network'][arg] = kwargs[arg]
network_req = self.new_create_request('networks', data, fmt)
if set_context and tenant_id:
# create a specific auth context for this request
network_req.environ['neutron.context'] = context.Context(
'', tenant_id)
return network_req.get_response(self.api)
@mock.patch.object(edge_utils.EdgeManager, '_deploy_edge')
def setUp(self, mock_deploy_edge,
plugin=PLUGIN_NAME,
ext_mgr=None,
service_plugins=None):
test_utils.override_nsx_ini_test()
mock_vcns = mock.patch(vmware.VCNS_NAME, autospec=True)
mock_vcns_instance = mock_vcns.start()
self.fc2 = fake_vcns.FakeVcns()
mock_vcns_instance.return_value = self.fc2
edge_utils.query_dhcp_service_config = mock.Mock(return_value=[])
self.mock_create_dhcp_service = mock.patch("%s.%s" % (
vmware.EDGE_MANAGE_NAME, 'create_dhcp_edge_service'))
self.mock_create_dhcp_service.start()
mock_update_dhcp_service = mock.patch("%s.%s" % (
vmware.EDGE_MANAGE_NAME, 'update_dhcp_edge_service'))
mock_update_dhcp_service.start()
mock_delete_dhcp_service = mock.patch("%s.%s" % (
vmware.EDGE_MANAGE_NAME, 'delete_dhcp_edge_service'))
mock_delete_dhcp_service.start()
self.default_res_pool = 'respool-28'
cfg.CONF.set_override("resource_pool_id", self.default_res_pool,
group="nsxv")
super(NsxVPluginV2TestCase, self).setUp(plugin=plugin,
ext_mgr=ext_mgr)
self.addCleanup(self.fc2.reset_all)
plugin_instance = manager.NeutronManager.get_plugin()
plugin_instance.real_get_edge = plugin_instance._get_edge_id_by_rtr_id
plugin_instance._get_edge_id_by_rtr_id = mock.Mock()
plugin_instance._get_edge_id_by_rtr_id.return_value = False
plugin_instance.edge_manager.is_dhcp_opt_enabled = True
def _get_core_plugin_with_dvs(self):
# enable dvs features to allow policy with QOS
cfg.CONF.set_default('use_dvs_features', True, 'nsxv')
plugin = manager.NeutronManager.get_plugin()
with mock.patch.object(dvs_utils, 'dvs_create_session'):
with mock.patch.object(dvs.DvsManager, '_get_dvs_moref'):
plugin._dvs = dvs.DvsManager()
return plugin
def test_get_vlan_network_name(self):
p = manager.NeutronManager.get_plugin()
net_id = uuidutils.generate_uuid()
dvs_id = 'dvs-10'
net = {'name': '',
'id': net_id}
# Empty net['name'] should yield dvs_id-net_id as a name for the
# port group.
expected = '%s-%s' % (dvs_id, net_id)
self.assertEqual(expected,
p._get_vlan_network_name(net, dvs_id))
# If network name is provided then it should yield
# dvs_id-net_name-net_id as a name for the port group.
net = {'name': 'pele',
'id': net_id}
expected = '%s-%s-%s' % (dvs_id, 'pele', net_id)
self.assertEqual(expected,
p._get_vlan_network_name(net, dvs_id))
name = 'X' * 500
net = {'name': name,
'id': net_id}
expected = '%s-%s-%s' % (dvs_id, name[:35], net_id)
self.assertEqual(expected,
p._get_vlan_network_name(net, dvs_id))
def test_get_vlan_network_name_with_net_name_missing(self):
p = manager.NeutronManager.get_plugin()
net_id = uuidutils.generate_uuid()
dvs_id = 'dvs-10'
net = {'id': net_id}
# Missing net['name'] should yield dvs_id-net_id as a name for the
# port group.
expected = '%s-%s' % (dvs_id, net_id)
self.assertEqual(expected,
p._get_vlan_network_name(net, dvs_id))
class TestNetworksV2(test_plugin.TestNetworksV2, NsxVPluginV2TestCase):
def test_create_network_vlan_transparent(self):
self.skipTest("Currently no support in plugin for this")
def _test_create_bridge_network(self, vlan_id=0):
net_type = vlan_id and 'vlan' or 'flat'
name = 'bridge_net'
expected = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', False),
(pnet.NETWORK_TYPE, net_type),
(pnet.PHYSICAL_NETWORK, 'tzuuid'),
(pnet.SEGMENTATION_ID, vlan_id)]
providernet_args = {pnet.NETWORK_TYPE: net_type,
pnet.PHYSICAL_NETWORK: 'tzuuid'}
if vlan_id:
providernet_args[pnet.SEGMENTATION_ID] = vlan_id
with self.network(name=name,
providernet_args=providernet_args,
arg_list=(pnet.NETWORK_TYPE,
pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID)) as net:
for k, v in expected:
self.assertEqual(net['network'][k], v)
def test_create_bridge_network(self):
self._test_create_bridge_network()
def test_create_bridge_vlan_network(self):
self._test_create_bridge_network(vlan_id=123)
def test_create_bridge_vlan_network_outofrange_returns_400(self):
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_bridge_network(vlan_id=5000)
self.assertEqual(ctx_manager.exception.code, 400)
def test_create_external_portgroup_network(self):
name = 'ext_net'
expected = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', False),
(external_net.EXTERNAL, True),
(pnet.NETWORK_TYPE, 'portgroup'),
(pnet.PHYSICAL_NETWORK, 'tzuuid')]
providernet_args = {pnet.NETWORK_TYPE: 'portgroup',
pnet.PHYSICAL_NETWORK: 'tzuuid',
external_net.EXTERNAL: True}
with self.network(name=name,
providernet_args=providernet_args,
arg_list=(pnet.NETWORK_TYPE,
pnet.PHYSICAL_NETWORK,
external_net.EXTERNAL)) as net:
for k, v in expected:
self.assertEqual(net['network'][k], v)
def test_delete_network_after_removing_subnet(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
fmt = 'json'
# Create new network
res = self._create_network(fmt=fmt, name='net',
admin_state_up=True)
network = self.deserialize(fmt, res)
subnet = self._make_subnet(fmt, network, gateway_ip,
cidr, ip_version=4)
req = self.new_delete_request('subnets', subnet['subnet']['id'])
sub_del_res = req.get_response(self.api)
self.assertEqual(sub_del_res.status_int, 204)
req = self.new_delete_request('networks', network['network']['id'])
net_del_res = req.get_response(self.api)
self.assertEqual(net_del_res.status_int, 204)
def test_list_networks_with_shared(self):
with self.network(name='net1'):
with self.network(name='net2', shared=True):
req = self.new_list_request('networks')
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(len(res['networks']), 2)
req_2 = self.new_list_request('networks')
req_2.environ['neutron.context'] = context.Context('',
'somebody')
res = self.deserialize('json', req_2.get_response(self.api))
# tenant must see a single network
self.assertEqual(len(res['networks']), 1)
def test_create_network_name_exceeds_40_chars(self):
name = 'this_is_a_network_whose_name_is_longer_than_40_chars'
with self.network(name=name) as net:
# Assert neutron name is not truncated
self.assertEqual(net['network']['name'], name)
def test_update_network_with_admin_false(self):
data = {'network': {'admin_state_up': False}}
with self.network() as net:
plugin = manager.NeutronManager.get_plugin()
self.assertRaises(NotImplementedError,
plugin.update_network,
context.get_admin_context(),
net['network']['id'], data)
def test_create_extend_dvs_provider_network(self):
name = 'provider_net'
expected = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', False),
(pnet.NETWORK_TYPE, 'flat'),
(pnet.PHYSICAL_NETWORK, 'dvs-uuid')]
providernet_args = {pnet.NETWORK_TYPE: 'flat',
pnet.PHYSICAL_NETWORK: 'dvs-uuid'}
with self.network(name=name,
providernet_args=providernet_args,
arg_list=(pnet.NETWORK_TYPE,
pnet.PHYSICAL_NETWORK)) as net:
for k, v in expected:
self.assertEqual(net['network'][k], v)
def test_create_same_vlan_network_with_different_dvs(self):
name = 'dvs-provider-net'
expected = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', False),
(pnet.NETWORK_TYPE, 'vlan'),
(pnet.SEGMENTATION_ID, 43),
(pnet.PHYSICAL_NETWORK, 'dvs-uuid-1')]
providernet_args = {pnet.NETWORK_TYPE: 'vlan',
pnet.SEGMENTATION_ID: 43,
pnet.PHYSICAL_NETWORK: 'dvs-uuid-1'}
with self.network(name=name,
providernet_args=providernet_args,
arg_list=(pnet.NETWORK_TYPE,
pnet.SEGMENTATION_ID,
pnet.PHYSICAL_NETWORK)) as net:
for k, v in expected:
self.assertEqual(net['network'][k], v)
expected_same_vlan = [(pnet.NETWORK_TYPE, 'vlan'),
(pnet.SEGMENTATION_ID, 43),
(pnet.PHYSICAL_NETWORK, 'dvs-uuid-2')]
providernet_args_1 = {pnet.NETWORK_TYPE: 'vlan',
pnet.SEGMENTATION_ID: 43,
pnet.PHYSICAL_NETWORK: 'dvs-uuid-2'}
with self.network(name=name,
providernet_args=providernet_args_1,
arg_list=(pnet.NETWORK_TYPE,
pnet.SEGMENTATION_ID,
pnet.PHYSICAL_NETWORK)) as net1:
for k, v in expected_same_vlan:
self.assertEqual(net1['network'][k], v)
def test_create_vlan_network_with_multiple_dvs(self):
name = 'multi-dvs-vlan-net'
providernet_args = {pnet.NETWORK_TYPE: 'vlan',
pnet.SEGMENTATION_ID: 100,
pnet.PHYSICAL_NETWORK: 'dvs-1, dvs-2, dvs-3'}
p = manager.NeutronManager.get_plugin()
with mock.patch.object(
p, '_create_vlan_network_at_backend',
# Return three netmorefs as side effect
side_effect=[_uuid(), _uuid(), _uuid()]) as vlan_net_call:
with self.network(name=name,
providernet_args=providernet_args,
arg_list=(pnet.NETWORK_TYPE,
pnet.SEGMENTATION_ID,
pnet.PHYSICAL_NETWORK)):
# _create_vlan_network_at_backend is expected to be called
# three times since we have three DVS IDs in the physical
# network attribute.
self.assertEqual(3, vlan_net_call.call_count)
def test_create_vlan_network_with_multiple_dvs_backend_failure(self):
net_data = {'name': 'vlan-net',
'tenant_id': self._tenant_id,
pnet.NETWORK_TYPE: 'vlan',
pnet.SEGMENTATION_ID: 100,
pnet.PHYSICAL_NETWORK: 'dvs-1, dvs-2, dvs-3'}
network = {'network': net_data}
p = manager.NeutronManager.get_plugin()
with mock.patch.object(
p, '_create_vlan_network_at_backend',
# Return two successful netmorefs and fail on the backend
# for the third netmoref creation as side effect.
side_effect=[_uuid(), _uuid(),
nsxv_exc.NsxPluginException(err_msg='')]):
with mock.patch.object(
p, '_delete_backend_network') as delete_net_call:
self.assertRaises(nsxv_exc.NsxPluginException,
p.create_network,
context.get_admin_context(),
network)
# Two successfully created port groups should be rolled back
# on the failure of third port group creation.
self.assertEqual(2, delete_net_call.call_count)
def test_create_vlan_network_with_multiple_dvs_not_found_failure(self):
net_data = {'name': 'vlan-net',
'tenant_id': self._tenant_id,
pnet.NETWORK_TYPE: 'vlan',
pnet.SEGMENTATION_ID: 100,
pnet.PHYSICAL_NETWORK: 'dvs-1, dvs-2, dvs-3'}
network = {'network': net_data}
p = manager.NeutronManager.get_plugin()
with mock.patch.object(
p, '_validate_provider_create',
side_effect=[nsxv_exc.NsxResourceNotFound(res_id='dvs-2',
res_name='dvs_id')]):
with mock.patch.object(
p, '_create_vlan_network_at_backend') as create_net_call:
self.assertRaises(nsxv_exc.NsxResourceNotFound,
p.create_network,
context.get_admin_context(),
network)
# Verify no port group is created on the backend.
self.assertEqual(0, create_net_call.call_count)
def test_create_vlan_network_with_multiple_dvs_ignore_duplicate_dvs(self):
name = 'multi-dvs-vlan-net'
providernet_args = {pnet.NETWORK_TYPE: 'vlan',
pnet.SEGMENTATION_ID: 100,
pnet.PHYSICAL_NETWORK: 'dvs-1, dvs-2, dvs-1'}
p = manager.NeutronManager.get_plugin()
with mock.patch.object(
p, '_create_vlan_network_at_backend',
# Return two netmorefs as side effect
side_effect=[_uuid(), _uuid()]) as vlan_net_call:
with self.network(name=name,
providernet_args=providernet_args,
arg_list=(pnet.NETWORK_TYPE,
pnet.SEGMENTATION_ID,
pnet.PHYSICAL_NETWORK)):
# _create_vlan_network_at_backend is expected to be called
# two times since we have only two unique DVS IDs in the
# physical network attribute.
self.assertEqual(2, vlan_net_call.call_count)
def test_get_dvs_ids_for_multiple_dvs_vlan_network(self):
p = manager.NeutronManager.get_plugin()
# If no DVS-ID is provided as part of physical network, return
# global DVS-ID configured in nsx.ini
physical_network = constants.ATTR_NOT_SPECIFIED
self.assertEqual(['fake_dvs_id'], p._get_dvs_ids(physical_network))
# If DVS-IDs are provided as part of physical network as a comma
# separated string, return them as a list of DVS-IDs.
physical_network = 'dvs-1,dvs-2, dvs-3'
expected_dvs_ids = ['dvs-1', 'dvs-2', 'dvs-3']
self.assertEqual(expected_dvs_ids,
sorted(p._get_dvs_ids(physical_network)))
# Ignore extra commas ',' in the physical_network attribute.
physical_network = ',,,dvs-1,dvs-2,, dvs-3,'
expected_dvs_ids = ['dvs-1', 'dvs-2', 'dvs-3']
self.assertEqual(expected_dvs_ids,
sorted(p._get_dvs_ids(physical_network)))
# Ignore duplicate DVS-IDs in the physical_network attribute.
physical_network = ',,,dvs-1,dvs-2,, dvs-2,'
expected_dvs_ids = ['dvs-1', 'dvs-2']
self.assertEqual(expected_dvs_ids,
sorted(p._get_dvs_ids(physical_network)))
def test_create_vxlan_with_tz_provider_network(self):
name = 'provider_net_vxlan'
expected = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', False),
(pnet.NETWORK_TYPE, 'vxlan'),
(pnet.PHYSICAL_NETWORK, 'vdnscope-2')]
providernet_args = {pnet.NETWORK_TYPE: 'vxlan',
pnet.PHYSICAL_NETWORK: 'vdnscope-2'}
with self.network(name=name,
providernet_args=providernet_args,
arg_list=(pnet.NETWORK_TYPE,
pnet.PHYSICAL_NETWORK)) as net:
for k, v in expected:
self.assertEqual(net['network'][k], v)
def test_create_vxlan_with_tz_provider_network_not_found_fail(self):
name = 'provider_net_vxlan'
data = {'network': {
'name': name,
'tenant_id': self._tenant_id,
pnet.SEGMENTATION_ID: constants.ATTR_NOT_SPECIFIED,
pnet.NETWORK_TYPE: 'vxlan',
pnet.PHYSICAL_NETWORK: 'vdnscope-2'}}
p = manager.NeutronManager.get_plugin()
with mock.patch.object(p.nsx_v.vcns, 'validate_vdn_scope',
side_effect=[False]):
self.assertRaises(nsxv_exc.NsxResourceNotFound,
p.create_network,
context.get_admin_context(),
data)
def test_create_network_with_qos_no_dvs_fail(self):
# network creation should fail if the qos policy parameter exists,
# and no use_dvs_features configured
data = {'network': {
'name': 'test-qos',
'tenant_id': self._tenant_id,
'qos_policy_id': _uuid()}}
plugin = manager.NeutronManager.get_plugin()
self.assertRaises(n_exc.InvalidInput,
plugin.create_network,
context.get_admin_context(),
data)
def test_update_network_with_qos_no_dvs_fail(self):
# network update should fail if the qos policy parameter exists,
# and no use_dvs_features configured
data = {'network': {'qos_policy_id': _uuid()}}
with self.network() as net:
plugin = manager.NeutronManager.get_plugin()
self.assertRaises(n_exc.InvalidInput,
plugin.update_network,
context.get_admin_context(),
net['network']['id'], data)
@mock.patch.object(dvs.DvsManager, 'update_port_groups_config')
@mock.patch.object(qos_utils.NsxVQosRule, '_init_from_policy_id')
def test_create_network_with_qos_policy(self,
fake_init_from_policy,
fake_dvs_update):
# enable dvs features to allow policy with QOS
plugin = self._get_core_plugin_with_dvs()
ctx = context.get_admin_context()
# Mark init as complete, as otherwise QoS won't be called
plugin.init_is_complete = True
# fake policy id
policy_id = _uuid()
data = {'network': {
'name': 'test-qos',
'tenant_id': self._tenant_id,
'qos_policy_id': policy_id,
'port_security_enabled': False,
'admin_state_up': False,
'shared': False
}}
with mock.patch('vmware_nsx.services.qos.common.utils.'
'get_network_policy_id',
return_value=policy_id):
# create the network - should succeed and translate the policy id
net = plugin.create_network(ctx, data)
self.assertEqual(policy_id, net[qos_consts.QOS_POLICY_ID])
fake_init_from_policy.assert_called_once_with(ctx, policy_id)
self.assertTrue(fake_dvs_update.called)
# Get network should also return the qos policy id
net2 = plugin.get_network(ctx, net['id'])
self.assertEqual(policy_id, net2[qos_consts.QOS_POLICY_ID])
@mock.patch.object(dvs.DvsManager, 'update_port_groups_config')
@mock.patch.object(qos_utils.NsxVQosRule, '_init_from_policy_id')
def test_update_network_with_qos_policy(self,
fake_init_from_policy,
fake_dvs_update):
# enable dvs features to allow policy with QOS
plugin = self._get_core_plugin_with_dvs()
ctx = context.get_admin_context()
# create the network without qos policy
data = {'network': {
'name': 'test-qos',
'tenant_id': self._tenant_id,
'port_security_enabled': False,
'admin_state_up': True,
'shared': False
}}
net = plugin.create_network(ctx, data)
# fake policy id
policy_id = _uuid()
data['network']['qos_policy_id'] = policy_id
# update the network - should succeed and translate the policy id
with mock.patch('vmware_nsx.services.qos.common.utils.'
'get_network_policy_id',
return_value=policy_id):
res = plugin.update_network(ctx, net['id'], data)
self.assertEqual(policy_id, res[qos_consts.QOS_POLICY_ID])
fake_init_from_policy.assert_called_once_with(ctx, policy_id)
self.assertTrue(fake_dvs_update.called)
# Get network should also return the qos policy id
net2 = plugin.get_network(ctx, net['id'])
self.assertEqual(policy_id, net2[qos_consts.QOS_POLICY_ID])
@mock.patch.object(dvs.DvsManager, 'update_port_groups_config')
@mock.patch.object(qos_utils.NsxVQosRule, '_init_from_policy_id')
def test_network_with_updated_qos_policy(self,
fake_init_from_policy,
fake_dvs_update):
# enable dvs features to allow policy with QOS
plugin = self._get_core_plugin_with_dvs()
ctx = context.get_admin_context()
# create the network with qos policy
policy_id = _uuid()
data = {'network': {
'name': 'test-qos',
'tenant_id': self._tenant_id,
'qos_policy_id': policy_id,
'port_security_enabled': False,
'admin_state_up': True,
'shared': False
}}
net = plugin.create_network(ctx, data)
# reset fake methods called flag
fake_init_from_policy.called = False
fake_dvs_update.called = False
# fake QoS policy obj:
fake_policy = qos_pol.QosPolicy()
fake_policy.id = policy_id
fake_policy.rules = []
# call the plugin notification callback as if the network was updated
with mock.patch.object(qos_pol.QosPolicy, "get_object",
return_value=fake_policy):
with mock.patch.object(qos_pol.QosPolicy, "get_bound_networks",
return_value=[net["id"]]):
plugin._handle_qos_notification(fake_policy,
callbacks_events.UPDATED)
# make sure the policy data was read, and the dvs was updated
self.assertTrue(fake_init_from_policy.called)
self.assertTrue(fake_dvs_update.called)
def test_create_network_with_bad_az_hint(self):
p = manager.NeutronManager.get_plugin()
ctx = context.get_admin_context()
data = {'network': {
'name': 'test-qos',
'tenant_id': self._tenant_id,
'port_security_enabled': False,
'admin_state_up': True,
'shared': False,
'availability_zone_hints': ['bad_hint']
}}
self.assertRaises(n_exc.NeutronException,
p.create_network,
ctx, data)
def test_create_network_with_az_hint(self):
az_name = 'az7'
az_config = az_name + ':respool-7:datastore-7:False'
cfg.CONF.set_override('availability_zones', [az_config], group="nsxv")
p = manager.NeutronManager.get_plugin()
p._availability_zones_data = nsx_az.ConfiguredAvailabilityZones()
ctx = context.get_admin_context()
data = {'network': {
'name': 'test-qos',
'tenant_id': self._tenant_id,
'port_security_enabled': False,
'admin_state_up': True,
'shared': False,
'availability_zone_hints': [az_name]
}}
# network creation should succeed
net = p.create_network(ctx, data)
self.assertEqual([az_name],
net['availability_zone_hints'])
# the availability zone is still empty until subnet creation
self.assertEqual([],
net['availability_zones'])
class TestVnicIndex(NsxVPluginV2TestCase,
test_vnic_index.VnicIndexDbTestCase):
def test_update_port_twice_with_the_same_index(self):
"""Tests that updates which does not modify the port vnic
index association do not produce any errors
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
res = self._port_index_update(port['port']['id'], 2)
self.assertEqual(2, res['port'][ext_vnic_idx.VNIC_INDEX])
res = self._port_index_update(port['port']['id'], 2)
self.assertEqual(2, res['port'][ext_vnic_idx.VNIC_INDEX])
class TestPortsV2(NsxVPluginV2TestCase,
test_plugin.TestPortsV2,
test_bindings.PortBindingsTestCase,
test_bindings.PortBindingsHostTestCaseMixin,
test_bindings.PortBindingsVnicTestCaseMixin):
VIF_TYPE = nsx_constants.VIF_TYPE_DVS
HAS_PORT_FILTER = True
def test_duplicate_mac_generation(self):
# simulate duplicate mac generation to make sure DBDuplicate is retried
responses = ['12:34:56:78:00:00', '12:34:56:78:00:00',
'12:34:56:78:00:01']
with mock.patch('neutron.common.utils.get_random_mac',
side_effect=responses) as grand_mac:
with self.subnet(enable_dhcp=False) as s:
with self.port(subnet=s) as p1, self.port(subnet=s) as p2:
self.assertEqual('12:34:56:78:00:00',
p1['port']['mac_address'])
self.assertEqual('12:34:56:78:00:01',
p2['port']['mac_address'])
self.assertEqual(3, grand_mac.call_count)
def test_get_ports_count(self):
with self.port(), self.port(), self.port(), self.port() as p:
tenid = p['port']['tenant_id']
ctx = context.Context(user_id=None, tenant_id=tenid,
is_admin=False)
pl = manager.NeutronManager.get_plugin()
count = pl.get_ports_count(ctx, filters={'tenant_id': [tenid]})
# Each port above has subnet => we have an additional port
# for DHCP
self.assertEqual(8, count)
def test_requested_ips_only(self):
with self.subnet(enable_dhcp=False) as subnet:
fixed_ip_data = [{'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
ips_only = ['10.0.0.18', '10.0.0.20', '10.0.0.22', '10.0.0.21',
'10.0.0.3', '10.0.0.17', '10.0.0.19']
ports_to_delete = []
for i in ips_only:
kwargs = {"fixed_ips": [{'ip_address': i}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ports_to_delete.append(port)
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(i, ips[0]['ip_address'])
self.assertEqual(subnet['subnet']['id'],
ips[0]['subnet_id'])
for p in ports_to_delete:
self._delete('ports', p['port']['id'])
def test_create_port_with_too_many_fixed_ips(self):
self.skipTest('DHCP only supports one binding')
def test_create_port_invalid_fixed_ip_address_v6_pd_slaac(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_port_invalid_fixed_ip_address_v6_pd_slaac(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_port_invalid_subnet_v6_pd_slaac(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_port_mac_v6_slaac(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_port_invalid_fixed_ip_address_v6_slaac(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_port_excluding_ipv6_slaac_subnet_from_fixed_ips(self):
self.skipTest('No DHCP v6 Support yet')
def test_requested_subnet_id_v6_slaac(self):
self.skipTest('No DHCP v6 Support yet')
def test_ip_allocation_for_ipv6_subnet_slaac_address_mode(self):
self.skipTest('No DHCP v6 Support yet')
def test_requested_fixed_ip_address_v6_slaac_router_iface(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_port_with_ipv6_slaac_subnet_in_fixed_ips(self):
self.skipTest('No DHCP v6 Support yet')
def test_requested_invalid_fixed_ip_address_v6_slaac(self):
self.skipTest('No DHCP v6 Support yet')
def test_delete_port_with_ipv6_slaac_address(self):
self.skipTest('No DHCP v6 Support yet')
def test_ip_allocation_for_ipv6_2_subnet_slaac_mode(self):
self.skipTest('No DHCP v6 Support yet')
def _test_create_port_with_ipv6_subnet_in_fixed_ips(self, addr_mode,
ipv6_pd=False):
self.skipTest('No DHCP v6 Support yet')
def test_create_port_anticipating_allocation(self):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24',
enable_dhcp=False) as subnet:
fixed_ips = [{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}]
self._create_port(self.fmt, network['network']['id'],
webob.exc.HTTPCreated.code,
fixed_ips=fixed_ips)
def test_update_port_mac_ip(self):
with self.subnet(enable_dhcp=False) as subnet:
updated_fixed_ips = [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.3'}]
self.check_update_port_mac(subnet=subnet,
updated_fixed_ips=updated_fixed_ips)
def test_list_ports(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet) as port1,\
self.port(subnet) as port2,\
self.port(subnet) as port3:
self._test_list_resources('port', [port1, port2, port3])
def test_list_ports_public_network(self):
with self.network(shared=True) as network:
with self.subnet(network, enable_dhcp=False) as subnet,\
self.port(subnet, tenant_id='tenant_1') as port1,\
self.port(subnet, tenant_id='tenant_2') as port2:
# Admin request - must return both ports
self._test_list_resources('port', [port1, port2])
# Tenant_1 request - must return single port
q_context = context.Context('', 'tenant_1')
self._test_list_resources('port', [port1],
neutron_context=q_context)
# Tenant_2 request - must return single port
q_context = context.Context('', 'tenant_2')
self._test_list_resources('port', [port2],
neutron_context=q_context)
def test_list_ports_with_pagination_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=test_plugin._fake_get_pagination_helper)
helper_patcher.start()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, mac_address='00:00:00:00:00:01') as port1,\
self.port(subnet, mac_address='00:00:00:00:00:02') as port2,\
self.port(subnet, mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
def test_list_ports_with_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, mac_address='00:00:00:00:00:01') as port1,\
self.port(subnet, mac_address='00:00:00:00:00:02') as port2,\
self.port(subnet, mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
def test_list_ports_with_sort_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_sorting_helper',
new=test_plugin._fake_get_sorting_helper)
helper_patcher.start()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, admin_state_up='True',
mac_address='00:00:00:00:00:01') as port1,\
self.port(subnet, admin_state_up='False',
mac_address='00:00:00:00:00:02') as port2,\
self.port(subnet, admin_state_up='False',
mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
def test_list_ports_with_sort_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, admin_state_up='True',
mac_address='00:00:00:00:00:01') as port1,\
self.port(subnet, admin_state_up='False',
mac_address='00:00:00:00:00:02') as port2,\
self.port(subnet, admin_state_up='False',
mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
def test_update_port_delete_ip(self):
# This test case overrides the default because the nsx plugin
# implements port_security/security groups and it is not allowed
# to remove an ip address from a port unless the security group
# is first removed.
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [],
secgrp.SECURITYGROUPS: []}}
req = self.new_update_request('ports',
data, port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
self.assertEqual(res['port']['fixed_ips'],
data['port']['fixed_ips'])
def _update_port_index(self, port_id, device_id, index):
data = {'port': {'device_id': device_id, 'vnic_index': index}}
req = self.new_update_request('ports',
data, port_id)
res = self.deserialize('json', req.get_response(self.api))
return res
@mock.patch.object(edge_utils.EdgeManager, 'delete_dhcp_binding')
def test_update_port_index(self, delete_dhcp_binding):
q_context = context.Context('', 'tenant_1')
device_id = _uuid()
with self.subnet() as subnet:
with self.port(subnet=subnet,
device_id=device_id,
device_owner='compute:None') as port:
self.assertIsNone(port['port']['vnic_index'])
vnic_index = 3
res = self._update_port_index(
port['port']['id'], device_id, vnic_index)
self.assertEqual(vnic_index, res['port']['vnic_index'])
# Updating the vnic_index to None implies the vnic does
# no longer obtain the addresses associated with this port,
# we need to inactivate previous addresses configurations for
# this vnic in the context of this network spoofguard policy.
self.fc2.inactivate_vnic_assigned_addresses = (
mock.Mock().inactivate_vnic_assigned_addresses)
policy_id = nsxv_db.get_spoofguard_policy_id(
q_context.session, port['port']['network_id'])
res = self._update_port_index(port['port']['id'], '', None)
vnic_id = '%s.%03d' % (device_id, vnic_index)
(self.fc2.inactivate_vnic_assigned_addresses.
assert_called_once_with(policy_id, vnic_id))
self.assertTrue(delete_dhcp_binding.called)
def test_update_port_with_compute_device_owner(self):
"""
Test that DHCP binding is created when ports 'device_owner'
is updated to compute, for example when attaching an interface to a
instance with existing port.
"""
with self.port() as port:
with mock.patch(PLUGIN_NAME + '._create_dhcp_static_binding'):
update = {'port': {'device_owner'}}
self.new_update_request('ports',
update, port['port']['id'])
def test_no_more_port_exception(self):
with self.subnet(enable_dhcp=False, cidr='10.0.0.0/31',
gateway_ip=None) as subnet:
id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, id)
data = self.deserialize(self.fmt, res)
msg = str(n_exc.IpAddressGenerationFailure(net_id=id))
self.assertEqual(data['NeutronError']['message'], msg)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_ports_vif_host(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
host_arg = {portbindings.HOST_ID: self.hostname}
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, name='name1',
arg_list=(portbindings.HOST_ID,), **host_arg),\
self.port(subnet, name='name2'):
ctx = context.get_admin_context()
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for port in ports:
if port['name'] == 'name1':
self._check_response_portbindings_host(port)
else:
self.assertFalse(port[portbindings.HOST_ID])
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for non_admin_port in ports:
self._check_response_no_portbindings_host(non_admin_port)
def test_ports_vif_host_update(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
host_arg = {portbindings.HOST_ID: self.hostname}
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, name='name1',
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1,\
self.port(subnet, name='name2') as port2:
data = {'port': {portbindings.HOST_ID: 'testhosttemp'}}
req = self.new_update_request(
'ports', data, port1['port']['id'])
req.get_response(self.api)
req = self.new_update_request(
'ports', data, port2['port']['id'])
ctx = context.get_admin_context()
req.get_response(self.api)
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for port in ports:
self.assertEqual('testhosttemp', port[portbindings.HOST_ID])
def test_ports_vif_details(self):
plugin = manager.NeutronManager.get_plugin()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet), self.port(subnet):
ctx = context.get_admin_context()
ports = plugin.get_ports(ctx)
self.assertEqual(len(ports), 2)
for port in ports:
self._check_response_portbindings(port)
# By default user is admin - now test non admin user
ctx = self._get_non_admin_context()
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(len(ports), 2)
for non_admin_port in ports:
self._check_response_no_portbindings(non_admin_port)
def test_ports_vnic_type(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, name='name1',
arg_list=(portbindings.VNIC_TYPE,), **vnic_arg),\
self.port(subnet, name='name2'):
ctx = context.get_admin_context()
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for port in ports:
if port['name'] == 'name1':
self._check_response_portbindings_vnic_type(port)
else:
self.assertEqual(portbindings.VNIC_NORMAL,
port[portbindings.VNIC_TYPE])
# By default user is admin - now test non admin user
ctx = context.Context(user_id=None,
tenant_id=self._tenant_id,
is_admin=False,
read_deleted="no")
ports = self._list('ports', neutron_context=ctx)['ports']
self.assertEqual(2, len(ports))
for non_admin_port in ports:
self._check_response_portbindings_vnic_type(non_admin_port)
def test_ports_vnic_type_list(self):
cfg.CONF.set_default('allow_overlapping_ips', True)
vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type}
with self.subnet(enable_dhcp=False) as subnet,\
self.port(subnet, name='name1',
arg_list=(portbindings.VNIC_TYPE,),
**vnic_arg) as port1,\
self.port(subnet, name='name2') as port2,\
self.port(subnet, name='name3',
arg_list=(portbindings.VNIC_TYPE,),
**vnic_arg) as port3:
self._test_list_resources('port', (port1, port2, port3),
query_params='%s=%s' % (
portbindings.VNIC_TYPE,
self.vnic_type))
def test_range_allocation(self):
with self.subnet(enable_dhcp=False, gateway_ip='10.0.0.3',
cidr='10.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 5)
alloc = ['10.0.0.1', '10.0.0.2', '10.0.0.4', '10.0.0.5',
'10.0.0.6']
for ip in ips:
self.assertIn(ip['ip_address'], alloc)
self.assertEqual(ip['subnet_id'],
subnet['subnet']['id'])
alloc.remove(ip['ip_address'])
self.assertEqual(len(alloc), 0)
self._delete('ports', port['port']['id'])
with self.subnet(enable_dhcp=False, gateway_ip='11.0.0.6',
cidr='11.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 5)
alloc = ['11.0.0.1', '11.0.0.2', '11.0.0.3', '11.0.0.4',
'11.0.0.5']
for ip in ips:
self.assertIn(ip['ip_address'], alloc)
self.assertEqual(ip['subnet_id'],
subnet['subnet']['id'])
alloc.remove(ip['ip_address'])
self.assertEqual(len(alloc), 0)
self._delete('ports', port['port']['id'])
def test_requested_subnet_id_v4_and_v6(self):
with self.subnet(enable_dhcp=False) as subnet:
# Get an IPv4 and IPv6 address
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(
self.fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='2607:f0d0:1002:51::/124',
ip_version=6,
gateway_ip=constants.ATTR_NOT_SPECIFIED,
enable_dhcp=False)
subnet2 = self.deserialize(self.fmt, res)
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet2['subnet']['id']}]}
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port3 = self.deserialize(self.fmt, res)
ips = port3['port']['fixed_ips']
cidr_v4 = subnet['subnet']['cidr']
cidr_v6 = subnet2['subnet']['cidr']
self.assertEqual(2, len(ips))
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
res = self._create_port(self.fmt, net_id=net_id)
port4 = self.deserialize(self.fmt, res)
# Check that a v4 and a v6 address are allocated
ips = port4['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
self._delete('ports', port3['port']['id'])
self._delete('ports', port4['port']['id'])
def test_update_port_update_ip(self):
"""Test update of port IP.
Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10.
"""
with self.subnet(enable_dhcp=False) as subnet:
fixed_ip_data = [{'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.10', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
def test_update_port_update_ips(self):
"""Update IP and associate new IP on port.
Check a port update with the specified subnet_id's. A IP address
will be allocated for each subnet_id.
"""
with self.subnet(enable_dhcp=False) as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': '10.0.0.3'}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(data['port']['admin_state_up'],
res['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.3', ips[0]['ip_address'], '10.0.0.3')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
def test_update_port_update_ip_dhcp(self):
#Test updating a port IP when the device owner is DHCP
with self.subnet(enable_dhcp=False) as subnet:
with self.port(subnet=subnet,
device_owner=constants.DEVICE_OWNER_DHCP) as port:
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"}]}}
plugin = manager.NeutronManager.get_plugin()
ctx = context.get_admin_context()
with mock.patch.object(
plugin.edge_manager,
'update_dhcp_edge_service') as update_dhcp:
plugin.update_port(ctx, port['port']['id'], data)
self.assertTrue(update_dhcp.called)
def test_update_port_update_ip_compute(self):
#Test that updating a port IP succeed if the device owner starts
#with compute.
owner = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'xxx'
with self.subnet(enable_dhcp=False) as subnet:
with self.port(subnet=subnet, device_id=_uuid(),
device_owner=owner) as port:
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"}]}}
plugin = manager.NeutronManager.get_plugin()
with mock.patch.object(
plugin.edge_manager,
'delete_dhcp_binding') as delete_dhcp:
with mock.patch.object(
plugin.edge_manager,
'create_static_binding') as create_static:
with mock.patch.object(
plugin.edge_manager,
'create_dhcp_bindings') as create_dhcp:
plugin.update_port(context.get_admin_context(),
port['port']['id'], data)
self.assertTrue(delete_dhcp.called)
self.assertTrue(create_static.called)
self.assertTrue(create_dhcp.called)
def test_update_port_update_ip_and_owner_fail(self):
#Test that updating a port IP and device owner at the same
#transaction fails
with self.subnet(enable_dhcp=False) as subnet:
with self.port(subnet=subnet,
device_owner='aaa') as port:
data = {'port': {'device_owner': 'bbb',
'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"}]}}
plugin = manager.NeutronManager.get_plugin()
self.assertRaises(n_exc.BadRequest,
plugin.update_port,
context.get_admin_context(),
port['port']['id'], data)
def test_update_port_update_ip_router(self):
#Test that updating a port IP succeed if the device owner is a router
owner = constants.DEVICE_OWNER_ROUTER_GW
router_id = _uuid()
old_ip = '10.0.0.3'
new_ip = '10.0.0.10'
with self.subnet(enable_dhcp=False) as subnet:
with self.port(subnet=subnet, device_id=router_id,
device_owner=owner,
fixed_ips=[{'ip_address': old_ip}]) as port:
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': new_ip}]}}
plugin = manager.NeutronManager.get_plugin()
ctx = context.get_admin_context()
router_obj = router_driver.RouterSharedDriver(plugin)
with mock.patch.object(plugin, '_find_router_driver',
return_value=router_obj):
with mock.patch.object(
router_obj,
'update_router_interface_ip') as update_router:
port_id = port['port']['id']
plugin.update_port(ctx, port_id, data)
net_id = port['port']['network_id']
update_router.assert_called_once_with(
ctx,
router_id,
port_id,
net_id,
old_ip,
new_ip, "255.255.255.0")
def test_update_port_update_ip_unatached_router(self):
#Test that updating a port IP succeed if the device owner is a router
#and the shared router is not attached to any edge yet
owner = constants.DEVICE_OWNER_ROUTER_GW
router_id = _uuid()
old_ip = '10.0.0.3'
new_ip = '10.0.0.10'
with self.subnet(enable_dhcp=False) as subnet:
with self.port(subnet=subnet, device_id=router_id,
device_owner=owner,
fixed_ips=[{'ip_address': old_ip}]) as port:
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': new_ip}]}}
plugin = manager.NeutronManager.get_plugin()
ctx = context.get_admin_context()
router_obj = router_driver.RouterSharedDriver(plugin)
with mock.patch.object(plugin, '_find_router_driver',
return_value=router_obj):
# make sure the router will not be attached to an edge
with mock.patch.object(
edge_utils, 'get_router_edge_id',
return_value=None):
port_id = port['port']['id']
# The actual test here is that this call does not
# raise an exception
new_port = plugin.update_port(ctx, port_id, data)
ips = new_port['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], new_ip)
self.assertEqual(ips[0]['subnet_id'],
subnet['subnet']['id'])
def test_update_port_delete_ip_router(self):
#Test that deleting a port IP succeed if the device owner is a router
owner = constants.DEVICE_OWNER_ROUTER_GW
router_id = _uuid()
old_ip = '10.0.0.3'
with self.subnet(enable_dhcp=False) as subnet:
with self.port(subnet=subnet, device_id=router_id,
device_owner=owner,
fixed_ips=[{'ip_address': old_ip}]) as port:
data = {'port': {'fixed_ips': []}}
plugin = manager.NeutronManager.get_plugin()
ctx = context.get_admin_context()
router_obj = router_driver.RouterSharedDriver(plugin)
with mock.patch.object(plugin, '_find_router_driver',
return_value=router_obj):
with mock.patch.object(
router_obj,
'update_router_interface_ip') as update_router:
port_id = port['port']['id']
plugin.update_port(ctx, port_id, data)
net_id = port['port']['network_id']
update_router.assert_called_once_with(
ctx,
router_id,
port_id,
net_id,
old_ip,
None, None)
def test_update_port_update_ip_address_only(self):
with self.subnet(enable_dhcp=False) as subnet:
ip_address = '10.0.0.2'
fixed_ip_data = [{'ip_address': ip_address,
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(ip_address, ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"},
{'ip_address': ip_address}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': ip_address,
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '10.0.0.10',
'subnet_id': subnet['subnet']['id']}, ips)
def test_update_dhcp_port_with_exceeding_fixed_ips(self):
self.skipTest('Updating dhcp port IP is not supported')
def test_requested_subnet_id_v4_and_v6_slaac(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_router_port_ipv4_and_ipv6_slaac_no_fixed_ips(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_port_with_multiple_ipv4_and_ipv6_subnets(self):
# This test should fail as the NSX-v plugin should cause Neutron to
# return a 400 status code
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
super(TestPortsV2, self).\
test_create_port_with_multiple_ipv4_and_ipv6_subnets()
self.assertEqual(ctx_manager.exception.code, 400)
def test_list_ports_for_network_owner(self):
with self.network(tenant_id='tenant_1') as network:
with self.subnet(network, enable_dhcp=False) as subnet:
with self.port(subnet, tenant_id='tenant_1') as port1,\
self.port(subnet, tenant_id='tenant_2') as port2:
# network owner request, should return all ports
port_res = self._list_ports(
'json', set_context=True, tenant_id='tenant_1')
port_list = self.deserialize('json', port_res)['ports']
port_ids = [p['id'] for p in port_list]
self.assertEqual(2, len(port_list))
self.assertIn(port1['port']['id'], port_ids)
self.assertIn(port2['port']['id'], port_ids)
# another tenant request, only return ports belong to it
port_res = self._list_ports(
'json', set_context=True, tenant_id='tenant_2')
port_list = self.deserialize('json', port_res)['ports']
port_ids = [p['id'] for p in port_list]
self.assertEqual(1, len(port_list))
self.assertNotIn(port1['port']['id'], port_ids)
self.assertIn(port2['port']['id'], port_ids)
class TestSubnetsV2(NsxVPluginV2TestCase,
test_plugin.TestSubnetsV2):
def setUp(self,
plugin=PLUGIN_NAME,
ext_mgr=None,
service_plugins=None):
super(TestSubnetsV2, self).setUp()
self.context = context.get_admin_context()
def _test_subnet_update_ipv4_and_ipv6_pd_subnets(self, ra_addr_mode):
self.skipTest('No DHCP v6 Support yet')
def test__subnet_ipv6_not_supported(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
'gateway': 'fe80::1',
'cidr': '2607:f0d0:1002:51::/64',
'ip_version': '6',
'tenant_id': network['network']['tenant_id']}}
subnet_req = self.new_create_request('subnets', data)
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
def test_create_subnet_ipv6_gw_is_nw_end_addr_returns_201(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_ipv6_out_of_cidr_global(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_ipv6_pd_gw_values(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_ipv6_slaac_with_port_on_network(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_ipv6_slaac_with_snat_intf_on_network(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_dhcpv6_stateless_with_port_on_network(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_ipv6_slaac_with_dhcp_port_on_network(self):
self.skipTest('No DHCP v6 Support yet')
def test_delete_subnet_ipv6_slaac_port_exists(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_ipv6_slaac_with_router_intf_on_network(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_ipv6_out_of_cidr_lla(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_subnet_inconsistent_ipv6_hostroute_dst_v4(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_only_ip_version_v6(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_subnet_ipv6_address_mode_fails(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_with_v6_allocation_pool(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_with_v6_pd_allocation_pool(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_subnet_ipv6_ra_mode_fails(self):
self.skipTest('No DHCP v6 Support yet')
def test_delete_subnet_ipv6_slaac_router_port_exists(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_subnet_inconsistent_ipv6_hostroute_np_v4(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_subnet_inconsistent_ipv6_gatewayv4(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_subnet_ipv6_attributes_fails(self):
self.skipTest('No DHCP v6 Support yet')
def test_update_subnet_ipv6_cannot_disable_dhcp(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_V6_pd_slaac(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_V6_pd_stateless(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_V6_pd_statefull(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_V6_pd_no_mode(self):
self.skipTest('No DHCP v6 Support yet')
def _create_subnet_bulk(self, fmt, number, net_id, name,
ip_version=4, **kwargs):
base_data = {'subnet': {'network_id': net_id,
'ip_version': ip_version,
'enable_dhcp': False,
'tenant_id': self._tenant_id}}
# auto-generate cidrs as they should not overlap
overrides = dict((k, v)
for (k, v) in zip(range(number),
[{'cidr': "10.0.%s.0/24" % num}
for num in range(number)]))
kwargs.update({'override': overrides})
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
def test_create_subnet_nonzero_cidr(self):
awkward_cidrs = [{'nonezero': '10.129.122.5/8',
'corrected': '10.0.0.0/8'},
{'nonezero': '11.129.122.5/15',
'corrected': '11.128.0.0/15'},
{'nonezero': '12.129.122.5/16',
'corrected': '12.129.0.0/16'},
{'nonezero': '13.129.122.5/18',
'corrected': '13.129.64.0/18'},
{'nonezero': '14.129.122.5/22',
'corrected': '14.129.120.0/22'},
{'nonezero': '15.129.122.5/24',
'corrected': '15.129.122.0/24'},
{'nonezero': '16.129.122.5/28',
'corrected': '16.129.122.0/28'}, ]
for cidr in awkward_cidrs:
with self.subnet(enable_dhcp=False,
cidr=cidr['nonezero']) as subnet:
# the API should accept and correct these cidrs for users
self.assertEqual(cidr['corrected'],
subnet['subnet']['cidr'])
with self.subnet(enable_dhcp=False, cidr='17.129.122.5/32',
gateway_ip=None) as subnet:
self.assertEqual('17.129.122.5/32', subnet['subnet']['cidr'])
def test_create_subnet_ipv6_attributes(self):
# Expected to fail for now as we don't support IPv6 for NSXv
cidr = "fe80::/80"
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(cidr=cidr)
self.assertEqual(ctx_manager.exception.code, 400)
def test_create_subnet_with_different_dhcp_server(self):
self.mock_create_dhcp_service.stop()
name = 'dvs-provider-net'
providernet_args = {pnet.NETWORK_TYPE: 'vlan',
pnet.SEGMENTATION_ID: 43,
pnet.PHYSICAL_NETWORK: 'dvs-uuid'}
with self.network(name=name, do_delete=False,
providernet_args=providernet_args,
arg_list=(pnet.NETWORK_TYPE,
pnet.SEGMENTATION_ID,
pnet.PHYSICAL_NETWORK)) as net:
self._test_create_subnet(network=net, cidr='10.0.0.0/24')
dhcp_router_id = (vcns_const.DHCP_EDGE_PREFIX +
net['network']['id'])[:36]
dhcp_server_id = nsxv_db.get_nsxv_router_binding(
self.context.session, dhcp_router_id)['edge_id']
providernet_args_1 = {pnet.NETWORK_TYPE: 'vlan',
pnet.SEGMENTATION_ID: 43,
pnet.PHYSICAL_NETWORK: 'dvs-uuid-1'}
with self.network(name=name, do_delete=False,
providernet_args=providernet_args_1,
arg_list=(pnet.NETWORK_TYPE,
pnet.SEGMENTATION_ID,
pnet.PHYSICAL_NETWORK)) as net1:
self._test_create_subnet(network=net1, cidr='10.0.1.0/24')
router_id = (vcns_const.DHCP_EDGE_PREFIX +
net1['network']['id'])[:36]
dhcp_server_id_1 = nsxv_db.get_nsxv_router_binding(
self.context.session, router_id)['edge_id']
self.assertNotEqual(dhcp_server_id, dhcp_server_id_1)
def test_create_subnet_with_different_dhcp_by_flat_net(self):
self.mock_create_dhcp_service.stop()
name = 'flat-net'
providernet_args = {pnet.NETWORK_TYPE: 'flat',
pnet.PHYSICAL_NETWORK: 'dvs-uuid'}
with self.network(name=name, do_delete=False,
providernet_args=providernet_args,
arg_list=(pnet.NETWORK_TYPE,
pnet.PHYSICAL_NETWORK)) as net:
self._test_create_subnet(network=net, cidr='10.0.0.0/24')
dhcp_router_id = (vcns_const.DHCP_EDGE_PREFIX +
net['network']['id'])[:36]
dhcp_server_id = nsxv_db.get_nsxv_router_binding(
self.context.session, dhcp_router_id)['edge_id']
providernet_args_1 = {pnet.NETWORK_TYPE: 'flat',
pnet.PHYSICAL_NETWORK: 'dvs-uuid'}
with self.network(name=name, do_delete=False,
providernet_args=providernet_args_1,
arg_list=(pnet.NETWORK_TYPE,
pnet.PHYSICAL_NETWORK)) as net1:
self._test_create_subnet(network=net1, cidr='10.0.1.0/24')
router_id = (vcns_const.DHCP_EDGE_PREFIX +
net1['network']['id'])[:36]
dhcp_server_id_1 = nsxv_db.get_nsxv_router_binding(
self.context.session, router_id)['edge_id']
self.assertNotEqual(dhcp_server_id, dhcp_server_id_1)
def test_create_subnet_ipv6_slaac_with_db_reference_error(self):
self.skipTest('Currently not support')
def test_create_subnet_ipv6_gw_values(self):
# This test should fail with response code 400 as IPv6 subnets with
# DHCP are not supported by this plugin
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
super(TestSubnetsV2, self).test_create_subnet_ipv6_gw_values()
self.assertEqual(ctx_manager.exception.code, 400)
def test_create_subnet_only_ip_version_v6_old(self):
self.skipTest('Currently not supported')
def test_create_subnet_reserved_network(self):
self.mock_create_dhcp_service.stop()
name = 'overlap-reserved-net'
providernet_args = {pnet.NETWORK_TYPE: 'flat',
pnet.PHYSICAL_NETWORK: 'dvs-uuid'}
with testlib_api.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
with self.network(name=name, do_delete=False,
providernet_args=providernet_args,
arg_list=(pnet.NETWORK_TYPE,
pnet.SEGMENTATION_ID,
pnet.PHYSICAL_NETWORK)) as net:
self._test_create_subnet(network=net,
cidr='169.254.128.128/25')
self.assertEqual(ctx_manager.exception.code, 400)
class TestSubnetPoolsV2(NsxVPluginV2TestCase, test_plugin.TestSubnetsV2):
def setUp(self,
plugin=PLUGIN_NAME,
ext_mgr=None,
service_plugins=None):
super(TestSubnetPoolsV2, self).setUp()
self.context = context.get_admin_context()
def test_subnet_update_ipv4_and_ipv6_pd_slaac_subnets(self):
self.skipTest('No DHCP v6 Support yet')
def test_subnet_update_ipv4_and_ipv6_pd_v6stateless_subnets(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_ipv6_gw_is_nw_end_addr_returns_201(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_ipv6_out_of_cidr_global(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_V6_pd_stateless(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_V6_pd_slaac(self):
self.skipTest('No DHCP v6 Support yet')
def test_create_subnet_dhcpv6_stateless_with_port_on_network(self):
self.skipTest('Not supported')
def test_create_subnet_ipv6_gw_values(self):
self.skipTest('Not supported')
def test_create_subnet_ipv6_out_of_cidr_lla(self):
self.skipTest('Not supported')
def test_create_subnet_ipv6_pd_gw_values(self):
self.skipTest('Not supported')
def test_create_subnet_ipv6_slaac_with_db_reference_error(self):
self.skipTest('Not supported')
def test_create_subnet_ipv6_slaac_with_dhcp_port_on_network(self):
self.skipTest('Not supported')
def test_create_subnet_ipv6_slaac_with_port_on_network(self):
self.skipTest('Not supported')
def test_create_subnet_ipv6_slaac_with_router_intf_on_network(self):
self.skipTest('Not supported')
def test_create_subnet_ipv6_slaac_with_snat_intf_on_network(self):
self.skipTest('Not supported')
def test_create_subnet_only_ip_version_v6(self):
self.skipTest('Not supported')
def test_create_subnet_with_v6_allocation_pool(self):
self.skipTest('Not supported')
def test_create_subnet_with_v6_pd_allocation_pool(self):
self.skipTest('Not supported')
def test_delete_subnet_ipv6_slaac_port_exists(self):
self.skipTest('Not supported')
def test_delete_subnet_ipv6_slaac_router_port_exists(self):
self.skipTest('Not supported')
def test_update_subnet_inconsistent_ipv6_gatewayv4(self):
self.skipTest('Not supported')
def test_update_subnet_inconsistent_ipv6_hostroute_dst_v4(self):
self.skipTest('Not supported')
def test_update_subnet_inconsistent_ipv6_hostroute_np_v4(self):
self.skipTest('Not supported')
def test_update_subnet_ipv6_address_mode_fails(self):
self.skipTest('Not supported')
def test_update_subnet_ipv6_attributes_fails(self):
self.skipTest('Not supported')
def test_update_subnet_ipv6_cannot_disable_dhcp(self):
self.skipTest('Not supported')
def test_update_subnet_ipv6_ra_mode_fails(self):
self.skipTest('Not supported')
def test_create_subnet_only_ip_version_v6_old(self):
self.skipTest('Currently not supported')
class TestBasicGet(test_plugin.TestBasicGet, NsxVPluginV2TestCase):
pass
class TestV2HTTPResponse(test_plugin.TestV2HTTPResponse, NsxVPluginV2TestCase):
pass
class TestL3ExtensionManager(object):
def get_resources(self):
# Simulate extension of L3 attribute map
# First apply attribute extensions
for key in l3.RESOURCE_ATTRIBUTE_MAP.keys():
l3.RESOURCE_ATTRIBUTE_MAP[key].update(
l3_ext_gw_mode.EXTENDED_ATTRIBUTES_2_0.get(key, {}))
l3.RESOURCE_ATTRIBUTE_MAP[key].update(
dist_router.EXTENDED_ATTRIBUTES_2_0.get(key, {}))
l3.RESOURCE_ATTRIBUTE_MAP[key].update(
router_type.EXTENDED_ATTRIBUTES_2_0.get(key, {}))
l3.RESOURCE_ATTRIBUTE_MAP[key].update(
router_size.EXTENDED_ATTRIBUTES_2_0.get(key, {}))
# Finally add l3 resources to the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
l3.RESOURCE_ATTRIBUTE_MAP)
return l3.L3.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
def backup_l3_attribute_map():
"""Return a backup of the original l3 attribute map."""
return dict((res, attrs.copy()) for
(res, attrs) in six.iteritems(l3.RESOURCE_ATTRIBUTE_MAP))
def restore_l3_attribute_map(map_to_restore):
"""Ensure changes made by fake ext mgrs are reverted."""
l3.RESOURCE_ATTRIBUTE_MAP = map_to_restore
class L3NatTest(test_l3_plugin.L3BaseForIntTests, NsxVPluginV2TestCase):
def _restore_l3_attribute_map(self):
l3.RESOURCE_ATTRIBUTE_MAP = self._l3_attribute_map_bk
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None, service_plugins=None):
self._l3_attribute_map_bk = {}
for item in l3.RESOURCE_ATTRIBUTE_MAP:
self._l3_attribute_map_bk[item] = (
l3.RESOURCE_ATTRIBUTE_MAP[item].copy())
cfg.CONF.set_override('task_status_check_interval', 200, group="nsxv")
cfg.CONF.set_override('api_extensions_path', vmware.NSXEXT_PATH)
l3_attribute_map_bk = backup_l3_attribute_map()
self.addCleanup(restore_l3_attribute_map, l3_attribute_map_bk)
ext_mgr = ext_mgr or TestL3ExtensionManager()
super(L3NatTest, self).setUp(
plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins)
self.plugin_instance = manager.NeutronManager.get_plugin()
self._plugin_name = "%s.%s" % (
self.plugin_instance.__module__,
self.plugin_instance.__class__.__name__)
self._plugin_class = self.plugin_instance.__class__
def tearDown(self):
plugin = manager.NeutronManager.get_plugin()
_manager = plugin.nsx_v.task_manager
# wait max ~10 seconds for all tasks to be finished
for i in range(100):
if not _manager.has_pending_task():
break
greenthread.sleep(0.1)
if _manager.has_pending_task():
_manager.show_pending_tasks()
raise Exception(_("Tasks not completed"))
_manager.stop()
# Ensure the manager thread has been stopped
self.assertIsNone(_manager._thread)
super(L3NatTest, self).tearDown()
def _create_l3_ext_network(self, vlan_id=None):
name = 'l3_ext_net'
return self.network(name=name,
router__external=True)
def _create_router(self, fmt, tenant_id, name=None,
admin_state_up=None, set_context=False,
arg_list=None, **kwargs):
tenant_id = tenant_id or _uuid()
data = {'router': {'tenant_id': tenant_id}}
if name:
data['router']['name'] = name
if admin_state_up:
data['router']['admin_state_up'] = admin_state_up
for arg in (('admin_state_up', 'tenant_id')
+ (arg_list or ())):
# Arg must be present and not empty
if kwargs.get(arg):
data['router'][arg] = kwargs[arg]
router_req = self.new_create_request('routers', data, fmt)
if set_context and tenant_id:
# create a specific auth context for this request
router_req.environ['neutron.context'] = context.Context(
'', tenant_id)
return router_req.get_response(self.ext_api)
def _make_router(self, fmt, tenant_id, name=None, admin_state_up=None,
external_gateway_info=None, set_context=False,
arg_list=None, **kwargs):
if external_gateway_info:
arg_list = ('external_gateway_info', ) + (arg_list or ())
res = self._create_router(fmt, tenant_id, name,
admin_state_up, set_context,
arg_list=arg_list,
external_gateway_info=external_gateway_info,
**kwargs)
return self.deserialize(fmt, res)
@contextlib.contextmanager
def router(self, name=None, admin_state_up=True,
fmt=None, tenant_id=None,
external_gateway_info=None, set_context=False,
**kwargs):
# avoid name duplication of edge
if not name:
name = _uuid()
router = self._make_router(fmt or self.fmt, tenant_id, name,
admin_state_up, external_gateway_info,
set_context, **kwargs)
yield router
def _recursive_sort_list(self, lst):
sorted_list = []
for ele in lst:
if isinstance(ele, list):
sorted_list.append(self._recursive_sort_list(ele))
elif isinstance(ele, dict):
sorted_list.append(self._recursive_sort_dict(ele))
else:
sorted_list.append(ele)
return sorted(sorted_list, key=utils.safe_sort_key)
def _recursive_sort_dict(self, dct):
sorted_dict = {}
for k, v in dct.items():
if isinstance(v, list):
sorted_dict[k] = self._recursive_sort_list(v)
elif isinstance(v, dict):
sorted_dict[k] = self._recursive_sort_dict(v)
else:
sorted_dict[k] = v
return sorted_dict
def _update_router_enable_snat(self, router_id, network_id, enable_snat):
return self._update('routers', router_id,
{'router': {'external_gateway_info':
{'network_id': network_id,
'enable_snat': enable_snat}}})
def test_floatingip_association_on_unowned_router(self):
self.skipTest("Currently no support in plugin for this")
def test_router_add_gateway_no_subnet(self):
self.skipTest('No support for no subnet gateway set')
def _set_net_external(self, net_id):
self._update('networks', net_id,
{'network': {external_net.EXTERNAL: True}})
def _add_external_gateway_to_router(self, router_id, network_id,
expected_code=webob.exc.HTTPOk.code,
neutron_context=None, ext_ips=None):
ext_ips = ext_ips or []
body = {'router':
{'external_gateway_info': {'network_id': network_id}}}
if ext_ips:
body['router']['external_gateway_info'][
'external_fixed_ips'] = ext_ips
return self._update('routers', router_id, body,
expected_code=expected_code,
neutron_context=neutron_context)
def test_router_add_gateway_no_subnet_forbidden(self):
with self.router() as r:
with self.network() as n:
self._set_net_external(n['network']['id'])
self._add_external_gateway_to_router(
r['router']['id'], n['network']['id'],
expected_code=webob.exc.HTTPBadRequest.code)
class L3NatTestCaseBase(test_l3_plugin.L3NatTestCaseMixin):
def test_create_floatingip_with_specific_ip(self):
with self.subnet(cidr='10.0.0.0/24',
enable_dhcp=False) as s:
network_id = s['subnet']['network_id']
self._set_net_external(network_id)
fp = self._make_floatingip(self.fmt, network_id,
floating_ip='10.0.0.10')
self.assertEqual('10.0.0.10',
fp['floatingip']['floating_ip_address'])
def test_floatingip_same_external_and_internal(self):
# Select router with subnet's gateway_ip for floatingip when
# routers connected to same subnet and external network.
with self.subnet(cidr="10.0.0.0/24", enable_dhcp=False) as exs,\
self.subnet(cidr="12.0.0.0/24",
gateway_ip="12.0.0.50",
enable_dhcp=False) as ins:
network_ex_id = exs['subnet']['network_id']
self._set_net_external(network_ex_id)
r2i_fixed_ips = [{'ip_address': '12.0.0.2'}]
with self.router() as r1,\
self.router() as r2,\
self.port(subnet=ins,
fixed_ips=r2i_fixed_ips) as r2i_port:
self._add_external_gateway_to_router(
r1['router']['id'],
network_ex_id)
self._router_interface_action('add', r2['router']['id'],
None,
r2i_port['port']['id'])
self._router_interface_action('add', r1['router']['id'],
ins['subnet']['id'],
None)
self._add_external_gateway_to_router(
r2['router']['id'],
network_ex_id)
with self.port(subnet=ins,
fixed_ips=[{'ip_address': '12.0.0.8'}]
) as private_port:
fp = self._make_floatingip(self.fmt, network_ex_id,
private_port['port']['id'])
self.assertEqual(r1['router']['id'],
fp['floatingip']['router_id'])
def test_create_floatingip_with_specific_ip_out_of_allocation(self):
with self.subnet(cidr='10.0.0.0/24',
allocation_pools=[
{'start': '10.0.0.10', 'end': '10.0.0.20'}],
enable_dhcp=False) as s:
network_id = s['subnet']['network_id']
self._set_net_external(network_id)
fp = self._make_floatingip(self.fmt, network_id,
floating_ip='10.0.0.30')
self.assertEqual('10.0.0.30',
fp['floatingip']['floating_ip_address'])
def test_create_floatingip_with_specific_ip_non_admin(self):
ctx = context.Context('user_id', 'tenant_id')
with self.subnet(cidr='10.0.0.0/24',
enable_dhcp=False) as s:
network_id = s['subnet']['network_id']
self._set_net_external(network_id)
self._make_floatingip(self.fmt, network_id,
set_context=ctx,
floating_ip='10.0.0.10',
http_status=webob.exc.HTTPForbidden.code)
def test_create_floatingip_with_specific_ip_out_of_subnet(self):
with self.subnet(cidr='10.0.0.0/24',
enable_dhcp=False) as s:
network_id = s['subnet']['network_id']
self._set_net_external(network_id)
self._make_floatingip(self.fmt, network_id,
floating_ip='10.0.1.10',
http_status=webob.exc.HTTPBadRequest.code)
def test_floatingip_multi_external_one_internal(self):
with self.subnet(cidr="10.0.0.0/24", enable_dhcp=False) as exs1,\
self.subnet(cidr="11.0.0.0/24", enable_dhcp=False) as exs2,\
self.subnet(cidr="12.0.0.0/24", enable_dhcp=False) as ins1:
network_ex_id1 = exs1['subnet']['network_id']
network_ex_id2 = exs2['subnet']['network_id']
self._set_net_external(network_ex_id1)
self._set_net_external(network_ex_id2)
r2i_fixed_ips = [{'ip_address': '12.0.0.2'}]
with self.router() as r1,\
self.router() as r2,\
self.port(subnet=ins1,
fixed_ips=r2i_fixed_ips) as r2i_port:
self._add_external_gateway_to_router(
r1['router']['id'],
network_ex_id1)
self._router_interface_action('add', r1['router']['id'],
ins1['subnet']['id'],
None)
self._add_external_gateway_to_router(
r2['router']['id'],
network_ex_id2)
self._router_interface_action('add', r2['router']['id'],
None,
r2i_port['port']['id'])
with self.port(subnet=ins1,
fixed_ips=[{'ip_address': '12.0.0.3'}]
) as private_port:
fp1 = self._make_floatingip(self.fmt, network_ex_id1,
private_port['port']['id'])
fp2 = self._make_floatingip(self.fmt, network_ex_id2,
private_port['port']['id'])
self.assertEqual(fp1['floatingip']['router_id'],
r1['router']['id'])
self.assertEqual(fp2['floatingip']['router_id'],
r2['router']['id'])
def test_create_floatingip_with_multisubnet_id(self):
with self.network() as network:
self._set_net_external(network['network']['id'])
with self.subnet(network,
enable_dhcp=False,
cidr='10.0.12.0/24') as subnet1:
with self.subnet(network,
enable_dhcp=False,
cidr='10.0.13.0/24') as subnet2:
with self.router():
res = self._create_floatingip(
self.fmt,
subnet1['subnet']['network_id'],
subnet_id=subnet1['subnet']['id'])
fip1 = self.deserialize(self.fmt, res)
res = self._create_floatingip(
self.fmt,
subnet1['subnet']['network_id'],
subnet_id=subnet2['subnet']['id'])
fip2 = self.deserialize(self.fmt, res)
self.assertTrue(
fip1['floatingip']['floating_ip_address'].startswith('10.0.12'))
self.assertTrue(
fip2['floatingip']['floating_ip_address'].startswith('10.0.13'))
def test_create_floatingip_with_wrong_subnet_id(self):
with self.network() as network1:
self._set_net_external(network1['network']['id'])
with self.subnet(network1,
enable_dhcp=False,
cidr='10.0.12.0/24') as subnet1:
with self.network() as network2: