# Copyright (c) 2014 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef from neutron_lib.api.definitions import port_security as psec from oslo_config import cfg from neutron.tests.unit.db import test_allowedaddresspairs_db as ext_pairs from vmware_nsx.tests.unit.nsx_p import test_plugin as test_p_plugin from vmware_nsx.tests.unit.nsx_v import test_plugin as test_nsx_v_plugin from vmware_nsx.tests.unit.nsx_v3 import test_constants as v3_constants from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_v3_plugin class TestAllowedAddressPairsNSXv2(test_v3_plugin.NsxV3PluginTestCaseMixin, ext_pairs.TestAllowedAddressPairs): # TODO(arosen): move to ext_pairs.TestAllowedAddressPairs once all # plugins do this correctly. def test_create_port_no_allowed_address_pairs(self): with self.network() as net: res = self._create_port(self.fmt, net['network']['id']) port = self.deserialize(self.fmt, res) self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], []) self._delete('ports', port['port']['id']) def test_create_port_security_false_allowed_address_pairs(self): self.skipTest('TBD') def test_create_overlap_with_fixed_ip(self): self.skipTest('Not supported') class TestAllowedAddressPairsNSXp(test_p_plugin.NsxPPluginTestCaseMixin, ext_pairs.TestAllowedAddressPairs): def setUp(self, plugin=test_p_plugin.PLUGIN_NAME, ext_mgr=None, service_plugins=None): super(TestAllowedAddressPairsNSXp, self).setUp( plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins) def test_create_bad_address_pairs_with_cidr(self): address_pairs = [{'mac_address': '00:00:00:00:00:01', 'ip_address': '10.0.0.1/24'}] self._create_port_with_address_pairs(address_pairs, 400) def test_create_port_allowed_address_pairs_v6(self): with self.network() as net: # Single IPv6 address_pairs = [{'ip_address': '1001::12'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) address_pairs[0]['mac_address'] = port['port']['mac_address'] self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id']) # IPv6 cidr address_pairs = [{'ip_address': '1001::/64'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) address_pairs[0]['mac_address'] = port['port']['mac_address'] self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id']) # Illegal IPv6 cidr address_pairs = [{'ip_address': '1001::12/64'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) self.assertIn('NeutronError', port) # Too many ipv6 pairs cfg.CONF.set_default('max_allowed_address_pair', 300) address_pairs = [] count = 1 while count < 17: address_pairs.append({'ip_address': '1001::%s' % count}) count += 1 res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) self.assertIn('NeutronError', port) # Legal number of ipv6 pairs address_pairs = [] count = 1 while count < 13: address_pairs.append({'ip_address': '1001::%s' % count}) count += 1 res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) self.assertNotIn('NeutronError', port) self._delete('ports', port['port']['id']) def test_create_port_allowed_address_pairs_v4(self): with self.network() as net: # Single IPv4 address_pairs = [{'ip_address': '10.0.0.12'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) address_pairs[0]['mac_address'] = port['port']['mac_address'] self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id']) # IPv4 cidr address_pairs = [{'ip_address': '10.0.0.0/24'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) address_pairs[0]['mac_address'] = port['port']['mac_address'] self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id']) # Illegal IPv4 cidr address_pairs = [{'ip_address': '10.0.0.1/24'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) self.assertIn('NeutronError', port) # Too many ipv4 pairs cfg.CONF.set_default('max_allowed_address_pair', 300) address_pairs = [] count = 1 while count < 129: address_pairs.append({'ip_address': '10.0.0.%s' % count}) count += 1 res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) self.assertIn('NeutronError', port) # Legal number of ipv4 pairs address_pairs = [] count = 1 while count < 125: address_pairs.append({'ip_address': '10.0.0.%s' % count}) count += 1 res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) self.assertNotIn('NeutronError', port) self._delete('ports', port['port']['id']) def test_update_add_bad_address_pairs_with_cidr(self): with self.network() as net: res = self._create_port(self.fmt, net['network']['id']) port = self.deserialize(self.fmt, res) address_pairs = [{'mac_address': '00:00:00:00:00:01', 'ip_address': '10.0.0.1/24'}] update_port = {'port': {addr_apidef.ADDRESS_PAIRS: address_pairs}} req = self.new_update_request('ports', update_port, port['port']['id']) res = req.get_response(self.api) self.assertEqual(res.status_int, 400) self._delete('ports', port['port']['id']) def test_mac_configuration(self): address_pairs = [{'mac_address': 'fa16.3e3e.3d01', 'ip_address': '10.0.0.1'}] self._create_port_with_address_pairs(address_pairs, 201) address_pairs = [{'mac_address': 'fa-16-3e-3e-3d-c4', 'ip_address': '10.0.0.1'}] self._create_port_with_address_pairs(address_pairs, 201) def test_create_port_security_false_allowed_address_pairs(self): self.skipTest('TBD') def test_create_overlap_with_fixed_ip(self): self.skipTest('Not supported') class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin, ext_pairs.TestAllowedAddressPairs): def setUp(self, plugin=v3_constants.PLUGIN_NAME, ext_mgr=None, service_plugins=None): super(TestAllowedAddressPairsNSXv3, self).setUp( plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins) def test_create_bad_address_pairs_with_cidr(self): address_pairs = [{'mac_address': '00:00:00:00:00:01', 'ip_address': '10.0.0.1/24'}] self._create_port_with_address_pairs(address_pairs, 400) def test_create_port_allowed_address_pairs_v6(self): with self.network() as net: # Single IPv6 address address_pairs = [{'ip_address': '1001::12'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) address_pairs[0]['mac_address'] = port['port']['mac_address'] self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id']) # IPv6 cidr address_pairs = [{'ip_address': '1001::/64'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) address_pairs[0]['mac_address'] = port['port']['mac_address'] self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id']) # Illegal IPv6 cidr address_pairs = [{'ip_address': '1001::12/64'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) self.assertIn('NeutronError', port) def test_update_add_bad_address_pairs_with_cidr(self): with self.network() as net: res = self._create_port(self.fmt, net['network']['id']) port = self.deserialize(self.fmt, res) address_pairs = [{'mac_address': '00:00:00:00:00:01', 'ip_address': '10.0.0.1/24'}] update_port = {'port': {addr_apidef.ADDRESS_PAIRS: address_pairs}} req = self.new_update_request('ports', update_port, port['port']['id']) res = req.get_response(self.api) self.assertEqual(res.status_int, 400) self._delete('ports', port['port']['id']) def test_create_port_security_false_allowed_address_pairs(self): self.skipTest('TBD') class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase, ext_pairs.TestAllowedAddressPairs): def setUp(self, plugin='vmware_nsx.plugin.NsxVPlugin', ext_mgr=None, service_plugins=None): super(TestAllowedAddressPairsNSXv, self).setUp( plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins) def test_create_port_security_false_allowed_address_pairs(self): self.skipTest('TBD') def test_update_port_security_off_address_pairs(self): self.skipTest('Not supported') def test_create_overlap_with_fixed_ip(self): address_pairs = [{'ip_address': '10.0.0.2'}] with self.network() as network: with self.subnet(network=network, cidr='10.0.0.0/24', enable_dhcp=False) as subnet: fixed_ips = [{'subnet_id': subnet['subnet']['id'], 'ip_address': '10.0.0.2'}] res = self._create_port(self.fmt, network['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS, 'fixed_ips'), allowed_address_pairs=address_pairs, fixed_ips=fixed_ips) self.assertEqual(res.status_int, 201) port = self.deserialize(self.fmt, res) self._delete('ports', port['port']['id']) def test_create_port_allowed_address_pairs(self): with self.network() as net: address_pairs = [{'ip_address': '10.0.0.1'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) address_pairs[0]['mac_address'] = port['port']['mac_address'] self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id']) def _test_create_port_remove_allowed_address_pairs(self, update_value): with self.network() as net: address_pairs = [{'ip_address': '10.0.0.1'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_apidef.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) update_port = {'port': {addr_apidef.ADDRESS_PAIRS: []}} req = self.new_update_request('ports', update_port, port['port']['id']) port = self.deserialize(self.fmt, req.get_response(self.api)) self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], []) self._delete('ports', port['port']['id']) def test_update_add_address_pairs(self): with self.network() as net: res = self._create_port(self.fmt, net['network']['id']) port = self.deserialize(self.fmt, res) address_pairs = [{'ip_address': '10.0.0.1'}] update_port = {'port': {addr_apidef.ADDRESS_PAIRS: address_pairs}} req = self.new_update_request('ports', update_port, port['port']['id']) port = self.deserialize(self.fmt, req.get_response(self.api)) address_pairs[0]['mac_address'] = port['port']['mac_address'] self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id']) def test_mac_configuration(self): address_pairs = [{'mac_address': '00:00:00:00:00:01', 'ip_address': '10.0.0.1'}] self._create_port_with_address_pairs(address_pairs, 400) def test_equal_to_max_allowed_address_pair(self): cfg.CONF.set_default('max_allowed_address_pair', 3) address_pairs = [{'ip_address': '10.0.0.1'}, {'ip_address': '10.0.0.2'}, {'ip_address': '10.0.0.3'}] self._create_port_with_address_pairs(address_pairs, 201) def test_create_port_security_true_allowed_address_pairs(self): if self._skip_port_security: self.skipTest("Plugin does not implement port-security extension") with self.network() as net: address_pairs = [{'ip_address': '10.0.0.1'}] res = self._create_port(self.fmt, net['network']['id'], arg_list=('port_security_enabled', addr_apidef.ADDRESS_PAIRS,), port_security_enabled=True, allowed_address_pairs=address_pairs) port = self.deserialize(self.fmt, res) self.assertTrue(port['port'][psec.PORTSECURITY]) address_pairs[0]['mac_address'] = port['port']['mac_address'] self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS], address_pairs) self._delete('ports', port['port']['id'])