NSXv: Remove redundant code to check for duplicate rules

Since Id4906cbdebd820d3349a4a3211ebb34491516c68, the plugin doesn't need
to check for duplicate security-group rule as the base class will do
this check.

Also explicitly set port-security to default value if not specified in
the request body for create_network, otherwise, some unittest may fail.

Change-Id: I9f44e16616da7a2b79220ce6f37318bb50985470
This commit is contained in:
Roey Chen 2016-04-14 05:14:09 -07:00
parent 9d6cd0c735
commit 126ffd5c24
2 changed files with 1 additions and 38 deletions

View File

@ -773,6 +773,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
dvs_net_ids.append(self._get_vlan_network_name(
net_data, dvs_id))
try:
net_data[psec.PORTSECURITY] = net_data.get(psec.PORTSECURITY, True)
# Create SpoofGuard policy for network anti-spoofing
if cfg.CONF.nsxv.spoofguard_enabled and backend_network:
sg_policy_id = None
@ -2518,13 +2519,6 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Failed to delete security group rule"))
def _check_for_duplicate_rules(self, context, rules):
# Remove rule id's before comparing between rules
rules = [{'security_group_rule':
{k: v for k, v in six.iteritems(r['security_group_rule'])
if k != 'id'}} for r in rules]
super(NsxVPluginV2, self)._check_for_duplicate_rules(context, rules)
def _remove_vnic_from_spoofguard_policy(self, session, net_id, vnic_id):
policy_id = nsxv_db.get_spoofguard_policy_id(session, net_id)
self.nsx_v.vcns.inactivate_vnic_assigned_addresses(policy_id, vnic_id)

View File

@ -41,7 +41,6 @@ from neutron.tests.unit import testlib_api
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_utils import uuidutils
import six
import webob.exc
@ -2957,36 +2956,6 @@ class NsxVTestSecurityGroup(ext_sg.TestSecurityGroups,
# (self.fc2.remove_member_from_security_group
# .assert_called_once_with(nsx_sg_id, vnic_id))
def test_skip_duplicate_default_sg_error(self):
num_called = [0]
original_func = self.plugin.create_security_group
def side_effect(context, security_group, default_sg):
# can't always raise, or create_security_group will hang
self.assertTrue(default_sg)
self.assertTrue(num_called[0] < 2)
num_called[0] += 1
ret = original_func(context, security_group, default_sg)
if num_called[0] == 1:
return ret
# make another call to cause an exception.
# NOTE(yamamoto): raising the exception by ourselves
# doesn't update the session state appropriately.
self.assertRaises(db_exc.DBDuplicateEntry(),
original_func, context, security_group,
default_sg)
with mock.patch.object(self.plugin,
'create_security_group',
side_effect=side_effect):
self.plugin.create_network(
context.get_admin_context(),
{'network': {'name': 'foo',
'admin_state_up': True,
'shared': False,
'tenant_id': 'bar',
'port_security_enabled': True}})
def test_create_secgroup_deleted_upon_fw_section_create_fail(self):
_context = context.Context('', 'tenant_id')
sg = {'security_group': {'name': 'default',