Delete SG rules when deleting their remote group

Subscribe to a callback that will be called upon security group deletion,
and will look for rules with this SG as their remote-group-id and
delete them explicitly.
Else, those rules will only be deleted from the neutron DB, without
the NSX backend being aware of this.

This code was added in the common plugin, and will affect the V, V3 & P plugins.

Change-Id: Ie01dc29efaa3bf30ac314f45542d83f5a4cf238f
This commit is contained in:
Adit Sarfaty 2019-04-07 14:10:58 +03:00
parent c29c51d2fb
commit 1ca92978ab
3 changed files with 89 additions and 0 deletions

View File

@ -445,9 +445,26 @@ def _validate_network_has_subnet(resource, event, trigger, **kwargs):
raise n_exc.InvalidInput(error_message=msg)
def _delete_sg_group_related_rules(resource, event, trigger, **kwargs):
"""Upon SG deletion, call the explicit delete method for rules with that
SG as the remote one.
Otherwise those will be deleted with on_delete cascade, leaving the NSX
backend unaware.
"""
sg_id = kwargs["security_group"]["id"]
context = kwargs["context"]
core_plugin = directory.get_plugin()
filters = {'remote_group_id': [sg_id]}
rules = core_plugin.get_security_group_rules(context, filters=filters)
for rule in rules:
core_plugin.delete_security_group_rule(context, rule["id"])
def subscribe():
registry.subscribe(_validate_network_has_subnet,
resources.ROUTER_GATEWAY, events.BEFORE_CREATE)
registry.subscribe(_delete_sg_group_related_rules,
resources.SECURITY_GROUP, events.PRECOMMIT_DELETE)
subscribe()

View File

@ -48,6 +48,7 @@ from neutron_lib.plugins import directory
from vmware_nsx.common import utils
from vmware_nsx.extensions import providersecuritygroup as provider_sg
from vmware_nsx.plugins.common import plugin as com_plugin
from vmware_nsx.plugins.nsx_p import plugin as nsx_plugin
from vmware_nsx.tests import unit as vmware
from vmware_nsx.tests.unit.common_plugin import common_v3
@ -1450,6 +1451,41 @@ class NsxPTestSecurityGroup(common_v3.FixExternalNetBaseTest,
scope=scope,
logged=False)
def test_create_security_group_rule_with_remote_group(self):
with self.security_group() as sg1, self.security_group() as sg2:
security_group_id = sg1['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
protocol = "tcp"
keys = [('remote_group_id', remote_group_id),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol)]
with self.security_group_rule(
security_group_id, direction=direction, protocol=protocol,
remote_group_id=remote_group_id) as rule:
for k, v, in keys:
self.assertEqual(rule['security_group_rule'][k], v)
def test_delete_security_group_rule_with_remote_group(self):
com_plugin.subscribe()
with self.security_group() as sg1, self.security_group() as sg2:
security_group_id = sg1['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
protocol = "tcp"
with self.security_group_rule(
security_group_id, direction=direction, protocol=protocol,
remote_group_id=remote_group_id) as rule,\
mock.patch.object(
self.plugin, "delete_security_group_rule") as del_rule:
# delete sg2
self._delete('security-groups', remote_group_id,
exc.HTTPNoContent.code)
# verify the rule was deleted
del_rule.assert_called_once_with(
mock.ANY, rule["security_group_rule"]["id"])
class NsxPTestL3ExtensionManager(object):

View File

@ -79,6 +79,7 @@ from vmware_nsx.extensions import projectpluginmap
from vmware_nsx.extensions import routersize as router_size
from vmware_nsx.extensions import routertype as router_type
from vmware_nsx.extensions import vnicindex as ext_vnic_idx
from vmware_nsx.plugins.common import plugin as com_plugin
from vmware_nsx.plugins.nsx_v import availability_zones as nsx_az
from vmware_nsx.plugins.nsx_v.drivers import (
distributed_router_driver as dist_router_driver)
@ -4078,6 +4079,41 @@ class NsxVTestSecurityGroup(ext_sg.TestSecurityGroups,
def test_create_security_group_rule_protocol_as_number_with_port(self):
self.skipTest('not supported')
def test_create_security_group_rule_with_remote_group(self):
with self.security_group() as sg1, self.security_group() as sg2:
security_group_id = sg1['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
protocol = "tcp"
keys = [('remote_group_id', remote_group_id),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol)]
with self.security_group_rule(
security_group_id, direction=direction, protocol=protocol,
remote_group_id=remote_group_id) as rule:
for k, v, in keys:
self.assertEqual(rule['security_group_rule'][k], v)
def test_delete_security_group_rule_with_remote_group(self):
com_plugin.subscribe()
with self.security_group() as sg1, self.security_group() as sg2:
security_group_id = sg1['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
protocol = "tcp"
with self.security_group_rule(
security_group_id, direction=direction, protocol=protocol,
remote_group_id=remote_group_id) as rule,\
mock.patch.object(
self.plugin, "delete_security_group_rule") as del_rule:
# delete sg2
self._delete('security-groups', remote_group_id,
webob.exc.HTTPNoContent.code)
# verify the rule was deleted
del_rule.assert_called_once_with(
mock.ANY, rule["security_group_rule"]["id"])
class TestVdrTestCase(L3NatTest, L3NatTestCaseBase,
test_l3_plugin.L3NatDBIntTestCase,