From e11acdcd095ee413b1daac7f1d2d73ea68da78b8 Mon Sep 17 00:00:00 2001 From: Adit Sarfaty Date: Mon, 8 Apr 2019 11:34:08 +0300 Subject: [PATCH] NSX|P: Fix removal of provider security groups from port Change-Id: I74e39328858fe91515f7796d3bef80de934843c4 --- vmware_nsx/plugins/nsx_p/plugin.py | 1 + vmware_nsx/tests/unit/nsx_p/test_plugin.py | 64 ++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/vmware_nsx/plugins/nsx_p/plugin.py b/vmware_nsx/plugins/nsx_p/plugin.py index eabfd58e3b..66f763c9f1 100644 --- a/vmware_nsx/plugins/nsx_p/plugin.py +++ b/vmware_nsx/plugins/nsx_p/plugin.py @@ -1131,6 +1131,7 @@ class NsxPolicyPlugin(nsx_plugin_common.NsxPluginV3Base): # for notifications original_port = super(NsxPolicyPlugin, self).get_port( context, port_id) + self._remove_provider_security_groups_from_list(original_port) port_data = port['port'] self._validate_update_port(context, port_id, original_port, port_data) diff --git a/vmware_nsx/tests/unit/nsx_p/test_plugin.py b/vmware_nsx/tests/unit/nsx_p/test_plugin.py index d24974b57a..b63dd14987 100644 --- a/vmware_nsx/tests/unit/nsx_p/test_plugin.py +++ b/vmware_nsx/tests/unit/nsx_p/test_plugin.py @@ -47,6 +47,7 @@ from neutron_lib.objects import registry as obj_reg from neutron_lib.plugins import directory from vmware_nsx.common import utils +from vmware_nsx.extensions import providersecuritygroup as provider_sg from vmware_nsx.plugins.nsx_p import plugin as nsx_plugin from vmware_nsx.tests import unit as vmware from vmware_nsx.tests.unit.common_plugin import common_v3 @@ -1278,6 +1279,9 @@ class NsxPTestSecurityGroup(common_v3.FixExternalNetBaseTest, super(NsxPTestSecurityGroup, self).setUp(plugin=plugin, ext_mgr=ext_mgr) self.project_id = test_db_base_plugin_v2.TEST_TENANT_ID + # add provider group attributes + secgrp.Securitygroup().update_attributes_map( + provider_sg.EXTENDED_ATTRIBUTES_2_0) def test_create_security_group_rule_icmp_with_type_and_code(self): """No non-zero icmp codes are currently supported by the NSX""" @@ -1354,6 +1358,66 @@ class NsxPTestSecurityGroup(common_v3.FixExternalNetBaseTest, tags=mock.ANY, category=policy_constants.CATEGORY_ENVIRONMENT) + def _create_provider_security_group(self): + body = {'security_group': {'name': 'provider-deny', + 'tenant_id': self._tenant_id, + 'description': 'provider sg', + 'provider': True}} + security_group_req = self.new_create_request('security-groups', body) + return self.deserialize(self.fmt, + security_group_req.get_response(self.ext_api)) + + def test_provider_sg_on_port(self): + psg = self._create_provider_security_group() + with mock.patch("vmware_nsxlib.v3.policy.core_resources." + "NsxPolicySegmentPortApi.create_or_overwrite" + ) as port_create: + with self.port(tenant_id=self._tenant_id) as port: + # make sure the port has the provider sg + port_data = port['port'] + self.assertEqual(1, len(port_data['provider_security_groups'])) + self.assertEqual(psg['security_group']['id'], + port_data['provider_security_groups'][0]) + + # Make sure the correct security groups tags were set + port_create.assert_called_once() + actual_tags = port_create.call_args[1]['tags'] + sg_tags = 0 + psg_tag_found = False + for tag in actual_tags: + if tag['scope'] == 'os-security-group': + sg_tags += 1 + if tag['tag'] == psg['security_group']['id']: + psg_tag_found = True + self.assertEqual(2, sg_tags) + self.assertTrue(psg_tag_found) + + def test_remove_provider_sg_from_port(self): + psg = self._create_provider_security_group() + with self.port(tenant_id=self._tenant_id) as port: + with mock.patch("vmware_nsxlib.v3.policy.core_resources." + "NsxPolicySegmentPortApi.create_or_overwrite" + ) as port_update: + # specifically remove the provider sg from the port + data = {'port': {'provider_security_groups': []}} + req = self.new_update_request('ports', + data, port['port']['id']) + res = self.deserialize('json', req.get_response(self.api)) + self.assertEqual(0, + len(res['port']['provider_security_groups'])) + # Make sure the correct security groups tags were set + port_update.assert_called_once() + actual_tags = port_update.call_args[1]['tags'] + sg_tags = 0 + psg_tag_found = False + for tag in actual_tags: + if tag['scope'] == 'os-security-group': + sg_tags += 1 + if tag['tag'] == psg['security_group']['id']: + psg_tag_found = True + self.assertEqual(1, sg_tags) + self.assertFalse(psg_tag_found) + def test_sg_rule_create_on_nsx(self): """Verify that a comm-map entry is created for a new SG rule """ name = description = 'sg1'