Merge "NSX-V support updating port-security of a port"

This commit is contained in:
Jenkins 2016-06-23 08:22:36 +00:00 committed by Gerrit Code Review
commit eb88764461
2 changed files with 114 additions and 25 deletions

View File

@ -1309,8 +1309,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
port_data = port['port']
if addr_pair.ADDRESS_PAIRS in attrs:
self._validate_address_pairs(attrs, original_port)
has_port_security = (cfg.CONF.nsxv.spoofguard_enabled and
original_port[psec.PORTSECURITY])
orig_has_port_security = (cfg.CONF.nsxv.spoofguard_enabled and
original_port[psec.PORTSECURITY])
port_ip_change = port_data.get('fixed_ips') is not None
device_owner_change = port_data.get('device_owner') is not None
@ -1319,11 +1319,24 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
msg = (_('Cannot set fixed ips and device owner together for port '
'%s') % original_port['id'])
raise n_exc.BadRequest(resource='port', msg=msg)
# We do not support updating the port-security field (yet)
if psec.PORTSECURITY in port['port']:
msg = (_('Cannot modify the port security of port %s after port '
'creation') % original_port['id'])
raise NotImplementedError(msg)
# Check if port security has changed
port_sec_change = False
has_port_security = orig_has_port_security
if (psec.PORTSECURITY in port_data and
port_data[psec.PORTSECURITY] != original_port[psec.PORTSECURITY]):
port_sec_change = True
has_port_security = (cfg.CONF.nsxv.spoofguard_enabled and
port_data[psec.PORTSECURITY])
# We do not support modification of port security with other
# parameters (only with security groups) to reduce some of
# the complications
if (len(port_data.keys()) > 2 or
(ext_sg.SECURITYGROUPS not in port_data and
len(port_data.keys()) > 1)):
msg = (_('Cannot set port security together with other '
'attributes for port %s') % original_port['id'])
raise n_exc.BadRequest(resource='port', msg=msg)
# TODO(roeyc): create a method '_process_vnic_index_update' from the
# following code block
@ -1456,6 +1469,28 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self._delete_port_vnic_index_mapping(context, id)
self._delete_dhcp_static_binding(context, original_port)
else:
# port security enabled / disabled
if port_sec_change:
self._process_port_port_security_update(
context, port_data, ret_port)
if has_port_security:
LOG.debug("Assigning vnic port fixed-ips: port %s, "
"vnic %s, with fixed-ips %s", id, vnic_id,
original_port['fixed_ips'])
self._update_vnic_assigned_addresses(
context.session, original_port, vnic_id)
# Remove vm from the exclusion list, since it now has
# port security
self._remove_vm_from_exclude_list(context, device_id,
id)
elif cfg.CONF.nsxv.spoofguard_enabled:
self._remove_vnic_from_spoofguard_policy(
context.session, original_port['network_id'],
vnic_id)
# Add vm to the exclusion list, since it has no port
# security now
self._add_vm_to_exclude_list(context, device_id, id)
# Update vnic with the newest approved IP addresses
if (has_port_security and
(updates_fixed_ips or update_assigned_addresses)):

View File

@ -25,7 +25,6 @@ from neutron.extensions import external_net
from neutron.extensions import l3
from neutron.extensions import l3_ext_gw_mode
from neutron.extensions import portbindings
from neutron.extensions import portsecurity as psec
from neutron.extensions import providernet as pnet
from neutron.extensions import securitygroup as secgrp
from neutron import manager
@ -3570,23 +3569,6 @@ class TestNSXPortSecurity(test_psec.TestPortSecurity,
def test_create_port_with_security_group_and_net_sec_false(self):
pass
def test_update_port_remove_port_security_security_group(self):
pass
def test_update_port_remove_port_security_security_group_read(self):
pass
def test_update_port_port_security_raise_not_implemented(self):
with self.network() as net:
with self.subnet(network=net) as sub:
with self.port(subnet=sub) as port:
update_port = {'port': {psec.PORTSECURITY: False}}
plugin = manager.NeutronManager.get_plugin()
self.assertRaises(NotImplementedError,
plugin.update_port,
context.get_admin_context(),
port['port']['id'], update_port)
def _create_compute_port(self, network_name, device_id, port_security):
# create a network without port security
res = self._create_network('json', network_name, True)
@ -3738,6 +3720,78 @@ class TestNSXPortSecurity(test_psec.TestPortSecurity,
self._del_port_with_vnic(port2['port']['id'], False)
self._del_port_with_vnic(port1['port']['id'], True)
def _toggle_port_security(self, port_id, enable_port_security,
update_exclude):
"""Enable/disable port security on a port, and verify that the exclude
list was updated as expected
"""
plugin = self._get_core_plugin_with_dvs()
vm_moref = 'dummy_moref'
data = {'port': {'port_security_enabled': enable_port_security}}
with mock.patch.object(plugin._dvs, 'get_vm_moref',
return_value=vm_moref):
if enable_port_security:
with mock.patch.object(
plugin.nsx_v.vcns,
'delete_vm_from_exclude_list') as exclude_list_del:
self.new_update_request(
'ports', data, port_id).get_response(self.api)
if update_exclude:
# make sure the vm was added to the exclude list
exclude_list_del.assert_called_once_with(vm_moref)
else:
self.assertFalse(exclude_list_del.called)
else:
with mock.patch.object(
plugin.nsx_v.vcns,
'add_vm_to_exclude_list') as exclude_list_add:
self.new_update_request(
'ports', data, port_id).get_response(self.api)
if update_exclude:
# make sure the vm was added to the exclude list
exclude_list_add.assert_called_once_with(vm_moref)
else:
self.assertFalse(exclude_list_add.called)
def test_update_port_security_with_vnic(self):
device_id = _uuid()
# create a compute port without port security
port = self._create_compute_port('net1', device_id, False)
# add vnic to the port
self._add_vnic_to_port(port['port']['id'], True, 3)
# enable port security
self._toggle_port_security(port['port']['id'], True, True)
# disable port security
self._toggle_port_security(port['port']['id'], False, True)
# delete vnic from the port
self._del_vnic_from_port(port['port']['id'], True)
def test_update_multiple_port_security_with_vnic(self):
device_id = _uuid()
# create a compute port without port security
port1 = self._create_compute_port('net1', device_id, False)
# add vnic to the port
self._add_vnic_to_port(port1['port']['id'], True, 3)
# create another compute port without port security
port2 = self._create_compute_port('net2', device_id, False)
# add vnic to the port
self._add_vnic_to_port(port2['port']['id'], False, 4)
# enable port security on both ports
self._toggle_port_security(port1['port']['id'], True, False)
self._toggle_port_security(port2['port']['id'], True, True)
# disable port security on both ports
self._toggle_port_security(port1['port']['id'], False, True)
self._toggle_port_security(port2['port']['id'], False, False)
class TestSharedRouterTestCase(L3NatTest, L3NatTestCaseBase,
test_l3_plugin.L3NatTestCaseMixin,