Merge "[OVN] Fix overlapping security group objects not correctly applied"

This commit is contained in:
Zuul 2022-01-28 17:33:55 +00:00 committed by Gerrit Code Review
commit 3033b8b6b7
2 changed files with 81 additions and 4 deletions

View File

@ -215,9 +215,23 @@ class OVNDriver(base.DriverBase):
"""
if not log_obj.resource_id and not log_obj.target_id:
# No sg, no port: return all pgs
return self._pgs_all()
# No sg, no port, ALL: return all pgs
if log_obj.event == log_const.ALL_EVENT:
return self._pgs_all()
try:
pg_drop = self.ovn_nb.lookup("Port_Group",
ovn_const.OVN_DROP_PORT_GROUP_NAME)
# No sg, no port, DROP: return DROP pg
if log_obj.event == log_const.DROP_EVENT:
return [{"name": pg_drop.name,
"acls": [r.uuid for r in pg_drop.acls]}]
# No sg, no port, ACCEPT: return all except DROP pg
pgs = self._pgs_all()
pgs.remove({"name": pg_drop.name,
"acls": [r.uuid for r in pg_drop.acls]})
return pgs
except idlutils.RowNotFound:
pass
pgs = []
# include special pg_drop to log DROP and ALL actions
if not log_obj.event or log_obj.event in (log_const.DROP_EVENT,
@ -229,6 +243,8 @@ class OVNDriver(base.DriverBase):
"acls": [r.uuid for r in pg.acls]})
except idlutils.RowNotFound:
pass
if log_obj.event == log_const.DROP_EVENT:
return pgs
if log_obj.resource_id:
try:

View File

@ -41,7 +41,8 @@ class LogApiTestCaseBase(functional_base.TestOVNFunctionalBase):
'resource_type': 'security_group',
'description': 'test net log',
'name': 'logme',
'enabled': enabled}
'enabled': enabled,
'event': log_const.ALL_EVENT}
if sg_id:
log_data['resource_id'] = sg_id
if port_id:
@ -152,6 +153,14 @@ class LogApiTestCaseComplex(LogApiTestCaseBase):
self.assertEqual(is_enabled, acl.log)
return acl
def _check_acl_log_drop(self, is_enabled=True):
acls = self.nb_api.get_port_group(
ovn_const.OVN_DROP_PORT_GROUP_NAME).acls
self.assertTrue(acls)
for acl in acls:
self.assertEqual(is_enabled, acl.log)
return acls
def _check_sgrs(self, sgrs=None, is_enabled=True):
if not sgrs:
sgrs = self.sgrs
@ -277,3 +286,55 @@ class LogApiTestCaseComplex(LogApiTestCaseBase):
self._check_sgrs(is_enabled=False)
self.assertEqual([],
self.nb_api.meter_list().execute(check_error=True))
def _add_logs_then_remove(self, event1, event2, sg=None, sgrs=None):
# Events were previously not correctly applied on ACLs. This test
# ensures that each event log only the necessary acls
drop_true_events = (log_const.DROP_EVENT, log_const.ALL_EVENT)
accept_true_events = (log_const.ALL_EVENT, log_const.ACCEPT_EVENT)
# Check there are no acls with their logging active
self._check_sgrs(sgrs=sgrs, is_enabled=False)
self._check_acl_log_drop(is_enabled=False)
# Add first log object
log_data1 = self._log_data(sg_id=sg)
log_data1['log']['event'] = event1
log_obj1 = self.log_plugin.create_log(self.ctxt, log_data1)
self._check_acl_log_drop(is_enabled=event1 in drop_true_events)
self._check_sgrs(sgrs=sgrs, is_enabled=event1 in accept_true_events)
# Add second log object
log_data2 = self._log_data(sg_id=sg)
log_data2['log']['event'] = event2
log_obj2 = self.log_plugin.create_log(self.ctxt, log_data2)
self._check_acl_log_drop(is_enabled=(event1 in drop_true_events or
event2 in drop_true_events))
self._check_sgrs(sgrs=sgrs, is_enabled=(event1 in accept_true_events or
event2 in accept_true_events))
# Delete second log object
self.log_plugin.delete_log(self.ctxt, log_obj2['id'])
self._check_acl_log_drop(is_enabled=event1 in drop_true_events)
self._check_sgrs(sgrs=sgrs, is_enabled=event1 in accept_true_events)
# Delete first log object
self.log_plugin.delete_log(self.ctxt, log_obj1['id'])
self._check_sgrs(sgrs=sgrs, is_enabled=False)
self._check_acl_log_drop(is_enabled=False)
def test_events_all_sg(self):
self._add_logs_then_remove(log_const.DROP_EVENT, log_const.ALL_EVENT)
self._add_logs_then_remove(
log_const.ACCEPT_EVENT, log_const.DROP_EVENT)
self._add_logs_then_remove(
log_const.DROP_EVENT, log_const.ACCEPT_EVENT)
def test_events_one_sg(self):
self._add_logs_then_remove(log_const.DROP_EVENT, log_const.ALL_EVENT,
sg=self.sg1, sgrs=self.sg1rs)
self._add_logs_then_remove(
log_const.ACCEPT_EVENT, log_const.DROP_EVENT, sg=self.sg2,
sgrs=self.sg2rs)
self._add_logs_then_remove(
log_const.DROP_EVENT, log_const.ACCEPT_EVENT, sg=self.sg3,
sgrs=self.sg3rs)