Merge "[OVN] Remove ACLs with remote SG during deletion of SG"

This commit is contained in:
Zuul 2022-08-24 17:32:38 +00:00 committed by Gerrit Code Review
commit a2249b3cd3
2 changed files with 56 additions and 9 deletions

View File

@ -263,6 +263,9 @@ class OVNMechanismDriver(api.MechanismDriver):
registry.subscribe(self._create_security_group,
resources.SECURITY_GROUP,
events.AFTER_CREATE)
registry.subscribe(self._delete_security_group_precommit,
resources.SECURITY_GROUP,
events.PRECOMMIT_DELETE)
registry.subscribe(self._delete_security_group,
resources.SECURITY_GROUP,
events.AFTER_DELETE)
@ -438,6 +441,14 @@ class OVNMechanismDriver(api.MechanismDriver):
self._ovn_client.create_security_group(context,
security_group)
def _delete_security_group_precommit(self, resource, event, trigger,
payload):
context = n_context.get_admin_context()
security_group_id = payload.resource_id
for sg_rule in self._plugin.get_security_group_rules(
context, filters={'remote_group_id': [security_group_id]}):
self._ovn_client.delete_security_group_rule(context, sg_rule)
def _delete_security_group(self, resource, event, trigger, payload):
context = payload.context
security_group_id = payload.resource_id

View File

@ -854,6 +854,12 @@ class TestSecurityGroup(base.TestOVNFunctionalBase):
super(TestSecurityGroup, self).setUp()
self._ovn_client = self.mech_driver._ovn_client
self.plugin = self.mech_driver._plugin
self.sg_data = {
'name': 'testsg',
'description': 'Test Security Group',
'tenant_id': self._tenant_id,
'is_default': True,
}
def _find_acls_for_sg(self, sg_id):
rows = self.nb_api.db_find_rows('ACL').execute(check_error=True)
@ -870,6 +876,13 @@ class TestSecurityGroup(base.TestOVNFunctionalBase):
return [r for r in rows if get_api_id(r) in rule_ids]
return []
def _find_acl_remote_sg(self, remote_sg_id):
# NOTE: the ACL to be found has ethertype=IPv4 and protocol=ICMP.
sg_match = '$pg_' + remote_sg_id.replace('-', '_') + '_ip4 && icmp4'
for row in self.nb_api.db_find_rows('ACL').execute(check_error=True):
if sg_match in row.match:
return row
def test_sg_stateful_toggle_updates_ovn_acls(self):
def check_acl_actions(sg_id, expected):
self.assertEqual(
@ -877,21 +890,15 @@ class TestSecurityGroup(base.TestOVNFunctionalBase):
set(a.action for a in self._find_acls_for_sg(sg_id))
)
sg_data = {
'name': 'testsg',
'description': 'Test Security Group',
'tenant_id': self._tenant_id,
'is_default': True,
}
sg = self.plugin.create_security_group(
self.context, security_group={'security_group': sg_data})
self.context, security_group={'security_group': self.sg_data})
check_acl_actions(sg['id'], 'allow-related')
def update_sg(stateful):
sg_data['stateful'] = stateful
self.sg_data['stateful'] = stateful
self.plugin.update_security_group(
self.context, sg['id'],
security_group={'security_group': sg_data})
security_group={'security_group': self.sg_data})
update_sg(False)
check_acl_actions(sg['id'], 'allow-stateless')
@ -902,6 +909,35 @@ class TestSecurityGroup(base.TestOVNFunctionalBase):
update_sg(False)
check_acl_actions(sg['id'], 'allow-stateless')
def test_remove_sg_with_related_rule_remote_sg(self):
self.sg_data['is_default'] = False
sg1 = self.plugin.create_security_group(
self.context, security_group={'security_group': self.sg_data})
sg2 = self.plugin.create_security_group(
self.context, security_group={'security_group': self.sg_data})
rule_data = {'direction': constants.INGRESS_DIRECTION,
'ethertype': constants.IPv4,
'protocol': constants.PROTO_NAME_ICMP,
'port_range_max': None,
'port_range_min': None,
'remote_ip_prefix': None,
'tenant_id': sg1['project_id'],
'remote_address_group_id': None,
'security_group_id': sg1['id'],
'remote_group_id': sg2['id']}
sg_rule = {'security_group_rule': rule_data}
rule = self.plugin.create_security_group_rule(self.context, sg_rule)
acl = self._find_acl_remote_sg(sg2['id'])
self.assertEqual(rule['id'],
acl.external_ids[ovn_const.OVN_SG_RULE_EXT_ID_KEY])
acls = self._find_acls_for_sg(sg1['id'])
self.assertEqual(3, len(acls))
self.plugin.delete_security_group(self.context, sg2['id'])
self.assertIsNone(self._find_acl_remote_sg(sg2['id']))
acls = self._find_acls_for_sg(sg1['id'])
self.assertEqual(2, len(acls))
class TestProvnetPorts(base.TestOVNFunctionalBase):