diff --git a/vmware_nsx/tests/unit/nsx_v/test_plugin.py b/vmware_nsx/tests/unit/nsx_v/test_plugin.py index 2bcff6195d..5025559aed 100644 --- a/vmware_nsx/tests/unit/nsx_v/test_plugin.py +++ b/vmware_nsx/tests/unit/nsx_v/test_plugin.py @@ -15,7 +15,8 @@ import contextlib import copy -import itertools + +import decorator from eventlet import greenthread import mock @@ -58,7 +59,6 @@ from neutron_lib.plugins import directory from neutron_lib.plugins import utils from neutron_lib.services.qos import constants as qos_consts from neutron_lib.utils import helpers -from neutron_lib.utils import net from oslo_config import cfg from oslo_utils import uuidutils import six @@ -118,6 +118,17 @@ def set_az_in_config(name, resource_pool_id="respool-7", group=group_name) +# Override subnet creation in some tests to create a subnet with dhcp +# disabled +@decorator.decorator +def with_no_dhcp_subnet(f, *args, **kwargs): + obj = args[0] + obj.subnet = obj.no_dhcp_subnet + result = f(*args, **kwargs) + obj.subnet = obj.original_subnet + return result + + class NsxVPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase): def _create_network(self, fmt, name, admin_state_up, @@ -279,6 +290,7 @@ class NsxVPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase): plugin_instance.init_complete(None, None, {}) self.context = context.get_admin_context() + self.original_subnet = self.subnet self.internal_net_id = None if self.with_md_proxy: @@ -287,6 +299,11 @@ class NsxVPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase): vcns_const.InternalEdgePurposes.INTER_EDGE_PURPOSE, 'default')['network_id'] + def no_dhcp_subnet(self, *args, **kwargs): + if 'enable_dhcp' in kwargs: + return self.original_subnet(*args, **kwargs) + return self.original_subnet(*args, enable_dhcp=False, **kwargs) + def _get_core_plugin_with_dvs(self): # enable dvs features to allow policy with QOS cfg.CONF.set_default('use_dvs_features', True, 'nsxv') @@ -1071,19 +1088,9 @@ class TestPortsV2(NsxVPluginV2TestCase, net_id2 = port['port']['id'] # other net uuid, same mac self.assertTrue(self.plugin._is_mac_in_use(ctx, net_id2, mac)) + @with_no_dhcp_subnet def test_duplicate_mac_generation(self): - # simulate duplicate mac generation to make sure DBDuplicate is retried - responses = ['12:34:56:78:00:00', '12:34:56:78:00:00', - '12:34:56:78:00:01'] - with mock.patch.object(net, 'random_mac_generator', - return_value=itertools.cycle(responses)) as grand_mac: - with self.subnet(enable_dhcp=False) as s: - with self.port(subnet=s) as p1, self.port(subnet=s) as p2: - self.assertEqual('12:34:56:78:00:00', - p1['port']['mac_address']) - self.assertEqual('12:34:56:78:00:01', - p2['port']['mac_address']) - self.assertEqual(3, grand_mac.call_count) + return super(TestPortsV2, self).test_duplicate_mac_generation() def test_get_ports_count(self): with self.port(), self.port(), self.port(), self.port() as p: @@ -1096,31 +1103,9 @@ class TestPortsV2(NsxVPluginV2TestCase, # for DHCP self.assertEqual(8, count) + @with_no_dhcp_subnet def test_requested_ips_only(self): - with self.subnet(enable_dhcp=False) as subnet: - fixed_ip_data = [{'ip_address': '10.0.0.2', - 'subnet_id': subnet['subnet']['id']}] - with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port: - ips = port['port']['fixed_ips'] - self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.2', ips[0]['ip_address']) - self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) - ips_only = ['10.0.0.18', '10.0.0.20', '10.0.0.22', '10.0.0.21', - '10.0.0.3', '10.0.0.17', '10.0.0.19'] - ports_to_delete = [] - for i in ips_only: - kwargs = {"fixed_ips": [{'ip_address': i}]} - net_id = port['port']['network_id'] - res = self._create_port(self.fmt, net_id=net_id, **kwargs) - port = self.deserialize(self.fmt, res) - ports_to_delete.append(port) - ips = port['port']['fixed_ips'] - self.assertEqual(1, len(ips)) - self.assertEqual(i, ips[0]['ip_address']) - self.assertEqual(subnet['subnet']['id'], - ips[0]['subnet_id']) - for p in ports_to_delete: - self._delete('ports', p['port']['id']) + return super(TestPortsV2, self).test_requested_ips_only() def test_delete_network_port_exists_owned_by_network_race(self): self.skipTest('Skip need to address in future') @@ -1177,91 +1162,31 @@ class TestPortsV2(NsxVPluginV2TestCase, def test_create_port_anticipating_allocation(self): self.skipTest('Multiple fixed ips on a port are not supported') + @with_no_dhcp_subnet def test_list_ports(self): - # for this test we need to enable overlapping ips - cfg.CONF.set_default('allow_overlapping_ips', True) - with self.subnet(enable_dhcp=False) as subnet,\ - self.port(subnet) as port1,\ - self.port(subnet) as port2,\ - self.port(subnet) as port3: - self._test_list_resources('port', [port1, port2, port3]) + return super(TestPortsV2, self).test_list_ports() + @with_no_dhcp_subnet def test_list_ports_public_network(self): - with self.network(shared=True) as network: - with self.subnet(network, enable_dhcp=False) as subnet,\ - self.port(subnet, tenant_id='tenant_1') as port1,\ - self.port(subnet, tenant_id='tenant_2') as port2: - - # Admin request - must return both ports - self._test_list_resources('port', [port1, port2]) - # Tenant_1 request - must return single port - q_context = context.Context('', 'tenant_1') - self._test_list_resources('port', [port1], - neutron_context=q_context) - # Tenant_2 request - must return single port - q_context = context.Context('', 'tenant_2') - self._test_list_resources('port', [port2], - neutron_context=q_context) + return super(TestPortsV2, self).test_list_ports_public_network() + @with_no_dhcp_subnet def test_list_ports_with_pagination_emulated(self): - helper_patcher = mock.patch( - 'neutron.api.v2.base.Controller._get_pagination_helper', - new=test_plugin._fake_get_pagination_helper) - helper_patcher.start() - cfg.CONF.set_default('allow_overlapping_ips', True) - with self.subnet(enable_dhcp=False) as subnet,\ - self.port(subnet, mac_address='00:00:00:00:00:01') as port1,\ - self.port(subnet, mac_address='00:00:00:00:00:02') as port2,\ - self.port(subnet, mac_address='00:00:00:00:00:03') as port3: - self._test_list_with_pagination('port', - (port1, port2, port3), - ('mac_address', 'asc'), 2, 2) + return super(TestPortsV2, + self).test_list_ports_with_pagination_emulated() + @with_no_dhcp_subnet def test_list_ports_with_pagination_native(self): - if self._skip_native_pagination: - self.skipTest("Skip test for not implemented pagination feature") - cfg.CONF.set_default('allow_overlapping_ips', True) - with self.subnet(enable_dhcp=False) as subnet,\ - self.port(subnet, mac_address='00:00:00:00:00:01') as port1,\ - self.port(subnet, mac_address='00:00:00:00:00:02') as port2,\ - self.port(subnet, mac_address='00:00:00:00:00:03') as port3: - self._test_list_with_pagination('port', - (port1, port2, port3), - ('mac_address', 'asc'), 2, 2) + return super(TestPortsV2, + self).test_list_ports_with_pagination_native() + @with_no_dhcp_subnet def test_list_ports_with_sort_emulated(self): - helper_patcher = mock.patch( - 'neutron.api.v2.base.Controller._get_sorting_helper', - new=test_plugin._fake_get_sorting_helper) - helper_patcher.start() - cfg.CONF.set_default('allow_overlapping_ips', True) - with self.subnet(enable_dhcp=False) as subnet,\ - self.port(subnet, admin_state_up='True', - mac_address='00:00:00:00:00:01') as port1,\ - self.port(subnet, admin_state_up='False', - mac_address='00:00:00:00:00:02') as port2,\ - self.port(subnet, admin_state_up='False', - mac_address='00:00:00:00:00:03') as port3: - - self._test_list_with_sort('port', (port3, port2, port1), - [('admin_state_up', 'asc'), - ('mac_address', 'desc')]) + return super(TestPortsV2, self).test_list_ports_with_sort_emulated() + @with_no_dhcp_subnet def test_list_ports_with_sort_native(self): - if self._skip_native_sorting: - self.skipTest("Skip test for not implemented sorting feature") - cfg.CONF.set_default('allow_overlapping_ips', True) - with self.subnet(enable_dhcp=False) as subnet,\ - self.port(subnet, admin_state_up='True', - mac_address='00:00:00:00:00:01') as port1,\ - self.port(subnet, admin_state_up='False', - mac_address='00:00:00:00:00:02') as port2,\ - self.port(subnet, admin_state_up='False', - mac_address='00:00:00:00:00:03') as port3: - - self._test_list_with_sort('port', (port3, port2, port1), - [('admin_state_up', 'asc'), - ('mac_address', 'desc')]) + return super(TestPortsV2, self).test_list_ports_with_sort_native() def test_list_ports_filtered_by_security_groups(self): ctx = context.get_admin_context() @@ -1413,93 +1338,23 @@ class TestPortsV2(NsxVPluginV2TestCase, self.new_update_request('ports', update, port['port']['id']) + @with_no_dhcp_subnet def test_ports_vif_host(self): - cfg.CONF.set_default('allow_overlapping_ips', True) - host_arg = {portbindings.HOST_ID: self.hostname} - with self.subnet(enable_dhcp=False) as subnet,\ - self.port(subnet, name='name1', - arg_list=(portbindings.HOST_ID,), **host_arg),\ - self.port(subnet, name='name2'): - ctx = context.get_admin_context() - ports = self._list('ports', neutron_context=ctx)['ports'] - self.assertEqual(2, len(ports)) - for port in ports: - if port['name'] == 'name1': - self._check_response_portbindings_host(port) - else: - self.assertFalse(port[portbindings.HOST_ID]) - # By default user is admin - now test non admin user - ctx = context.Context(user_id=None, - tenant_id=self._tenant_id, - is_admin=False) - ports = self._list('ports', neutron_context=ctx)['ports'] - self.assertEqual(2, len(ports)) - for non_admin_port in ports: - self._check_response_no_portbindings_host(non_admin_port) + return super(TestPortsV2, self).test_ports_vif_host() + @with_no_dhcp_subnet def test_ports_vif_host_update(self): - cfg.CONF.set_default('allow_overlapping_ips', True) - host_arg = {portbindings.HOST_ID: self.hostname} - with self.subnet(enable_dhcp=False) as subnet,\ - self.port(subnet, name='name1', - arg_list=(portbindings.HOST_ID,), - **host_arg) as port1,\ - self.port(subnet, name='name2') as port2: - data = {'port': {portbindings.HOST_ID: 'testhosttemp'}} - req = self.new_update_request( - 'ports', data, port1['port']['id']) - req.get_response(self.api) - req = self.new_update_request( - 'ports', data, port2['port']['id']) - ctx = context.get_admin_context() - req.get_response(self.api) - ports = self._list('ports', neutron_context=ctx)['ports'] - self.assertEqual(2, len(ports)) - for port in ports: - self.assertEqual('testhosttemp', port[portbindings.HOST_ID]) + return super(TestPortsV2, self).test_ports_vif_host_update() + @with_no_dhcp_subnet def test_ports_vif_details(self): - plugin = directory.get_plugin() - cfg.CONF.set_default('allow_overlapping_ips', True) - with self.subnet(enable_dhcp=False) as subnet,\ - self.port(subnet), self.port(subnet): - ctx = context.get_admin_context() - ports = plugin.get_ports(ctx) - self.assertEqual(len(ports), 2) - for port in ports: - self._check_response_portbindings(port) - # By default user is admin - now test non admin user - ctx = self._get_non_admin_context() - ports = self._list('ports', neutron_context=ctx)['ports'] - self.assertEqual(len(ports), 2) - for non_admin_port in ports: - self._check_response_no_portbindings(non_admin_port) + return super(TestPortsV2, self).test_ports_vif_details() + @with_no_dhcp_subnet def test_ports_vnic_type(self): - cfg.CONF.set_default('allow_overlapping_ips', True) - vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type} - with self.subnet(enable_dhcp=False) as subnet,\ - self.port(subnet, name='name1', - arg_list=(portbindings.VNIC_TYPE,), **vnic_arg),\ - self.port(subnet, name='name2'): - ctx = context.get_admin_context() - ports = self._list('ports', neutron_context=ctx)['ports'] - self.assertEqual(2, len(ports)) - for port in ports: - if port['name'] == 'name1': - self._check_response_portbindings_vnic_type(port) - else: - self.assertEqual(portbindings.VNIC_NORMAL, - port[portbindings.VNIC_TYPE]) - # By default user is admin - now test non admin user - ctx = context.Context(user_id=None, - tenant_id=self._tenant_id, - is_admin=False) - ports = self._list('ports', neutron_context=ctx)['ports'] - self.assertEqual(2, len(ports)) - for non_admin_port in ports: - self._check_response_portbindings_vnic_type(non_admin_port) + return super(TestPortsV2, self).test_ports_vnic_type() + @with_no_dhcp_subnet def test_ports_vnic_type_list(self): cfg.CONF.set_default('allow_overlapping_ips', True) vnic_arg = {portbindings.VNIC_TYPE: self.vnic_type} @@ -1534,51 +1389,13 @@ class TestPortsV2(NsxVPluginV2TestCase, def test_requested_subnet_id_v4_and_v6(self): self.skipTest('Multiple fixed ips on a port are not supported') + @with_no_dhcp_subnet def test_update_port_update_ip(self): - """Test update of port IP. - - Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10. - """ - with self.subnet(enable_dhcp=False) as subnet: - fixed_ip_data = [{'ip_address': '10.0.0.2', - 'subnet_id': subnet['subnet']['id']}] - with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port: - ips = port['port']['fixed_ips'] - self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.2', ips[0]['ip_address']) - self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) - data = {'port': {'fixed_ips': [{'subnet_id': - subnet['subnet']['id'], - 'ip_address': "10.0.0.10"}]}} - req = self.new_update_request('ports', data, - port['port']['id']) - res = self.deserialize(self.fmt, req.get_response(self.api)) - ips = res['port']['fixed_ips'] - self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.10', ips[0]['ip_address']) - self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) + return super(TestPortsV2, self).test_update_port_update_ip() + @with_no_dhcp_subnet def test_update_port_update_ips(self): - """Update IP and associate new IP on port. - - Check a port update with the specified subnet_id's. A IP address - will be allocated for each subnet_id. - """ - with self.subnet(enable_dhcp=False) as subnet: - with self.port(subnet=subnet) as port: - data = {'port': {'admin_state_up': False, - 'fixed_ips': [{'subnet_id': - subnet['subnet']['id'], - 'ip_address': '10.0.0.3'}]}} - req = self.new_update_request('ports', data, - port['port']['id']) - res = self.deserialize(self.fmt, req.get_response(self.api)) - self.assertEqual(data['port']['admin_state_up'], - res['port']['admin_state_up']) - ips = res['port']['fixed_ips'] - self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.3', ips[0]['ip_address'], '10.0.0.3') - self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) + return super(TestPortsV2, self).test_update_port_update_ips() def test_update_port_update_ip_dhcp(self): #Test updating a port IP when the device owner is DHCP @@ -1785,28 +1602,9 @@ class TestPortsV2(NsxVPluginV2TestCase, test_create_port_with_multiple_ipv4_and_ipv6_subnets() self.assertEqual(ctx_manager.exception.code, 400) + @with_no_dhcp_subnet def test_list_ports_for_network_owner(self): - with self.network(tenant_id='tenant_1') as network: - with self.subnet(network, enable_dhcp=False) as subnet: - with self.port(subnet, tenant_id='tenant_1') as port1,\ - self.port(subnet, tenant_id='tenant_2') as port2: - # network owner request, should return all ports - port_res = self._list_ports( - 'json', set_context=True, tenant_id='tenant_1') - port_list = self.deserialize('json', port_res)['ports'] - port_ids = [p['id'] for p in port_list] - self.assertEqual(2, len(port_list)) - self.assertIn(port1['port']['id'], port_ids) - self.assertIn(port2['port']['id'], port_ids) - - # another tenant request, only return ports belong to it - port_res = self._list_ports( - 'json', set_context=True, tenant_id='tenant_2') - port_list = self.deserialize('json', port_res)['ports'] - port_ids = [p['id'] for p in port_list] - self.assertEqual(1, len(port_list)) - self.assertNotIn(port1['port']['id'], port_ids) - self.assertIn(port2['port']['id'], port_ids) + return super(TestPortsV2, self).test_list_ports_for_network_owner() def test_mac_duplication(self): # create 2 networks @@ -2089,32 +1887,9 @@ class TestSubnetsV2(NsxVPluginV2TestCase, kwargs.update({'override': overrides}) return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs) + @with_no_dhcp_subnet def test_create_subnet_nonzero_cidr(self): - awkward_cidrs = [{'nonezero': '10.129.122.5/8', - 'corrected': '10.0.0.0/8'}, - {'nonezero': '11.129.122.5/15', - 'corrected': '11.128.0.0/15'}, - {'nonezero': '12.129.122.5/16', - 'corrected': '12.129.0.0/16'}, - {'nonezero': '13.129.122.5/18', - 'corrected': '13.129.64.0/18'}, - {'nonezero': '14.129.122.5/22', - 'corrected': '14.129.120.0/22'}, - {'nonezero': '15.129.122.5/24', - 'corrected': '15.129.122.0/24'}, - {'nonezero': '16.129.122.5/28', - 'corrected': '16.129.122.0/28'}, ] - - for cidr in awkward_cidrs: - with self.subnet(enable_dhcp=False, - cidr=cidr['nonezero']) as subnet: - # the API should accept and correct these cidrs for users - self.assertEqual(cidr['corrected'], - subnet['subnet']['cidr']) - - with self.subnet(enable_dhcp=False, cidr='17.129.122.5/32', - gateway_ip=None) as subnet: - self.assertEqual('17.129.122.5/32', subnet['subnet']['cidr']) + return super(TestSubnetsV2, self).test_create_subnet_nonzero_cidr() def test_create_subnet_ipv6_attributes(self): # Expected to fail for now as we don't support IPv6 for NSXv @@ -4027,6 +3802,12 @@ class NsxVSecurityGroupsTestCase(ext_sg.SecurityGroupDBTestCase): ext_mgr=ext_mgr) self.plugin = directory.get_plugin() self.addCleanup(self.fc2.reset_all) + self.original_subnet = self.subnet + + def no_dhcp_subnet(self, *args, **kwargs): + if 'enable_dhcp' in kwargs: + return self.original_subnet(*args, **kwargs) + return self.original_subnet(*args, enable_dhcp=False, **kwargs) class NsxVTestSecurityGroup(ext_sg.TestSecurityGroups, @@ -4047,16 +3828,10 @@ class NsxVTestSecurityGroup(ext_sg.TestSecurityGroups, plugin_instance._get_edge_id_and_az_by_rtr_id.return_value = ( False, False) + @with_no_dhcp_subnet def test_list_ports_security_group(self): - with self.network() as n: - with self.subnet(n, enable_dhcp=False): - self._create_port(self.fmt, n['network']['id']) - req = self.new_list_request('ports') - res = req.get_response(self.api) - ports = self.deserialize(self.fmt, res) - port = ports['ports'][0] - self.assertEqual(len(port[secgrp.SECURITYGROUPS]), 1) - self._delete('ports', port['id']) + return super(NsxVTestSecurityGroup, + self).test_list_ports_security_group() def test_vnic_security_group_membership(self): p = directory.get_plugin()