Consider logging options when using OVNdbsync

Previously, OVN db sync would erase an ACL if any unexpected property
appeared on it and not recreate it again. This happened because of the
order of deletion and creation of the ACLS: the new ACL was first
created and then deleted just the moment after that. This meant that
even crucial ACLs like the ones bounded to the pg_drop port group, which
are used to reject all the traffic by default on ML2/OVN environments,
would dissapear. The order of the ACL deletion and creation has been
inverted to avoid this.

Furthermore, security group logging was not supported on the
ovn_db_sync script, which would also cause the logging parameters to
dissapear. Now, the logging options are considered when doing a sync.

Closes-Bug: #2107925
Change-Id: I00fa8332fdebc958ddb8f28c638670c75a70e0c5
Signed-off-by: Elvira Garcia <egarciar@redhat.com>
This commit is contained in:
Elvira
2025-05-27 19:11:46 +00:00
committed by Elvira Garcia
parent 18dff1b31a
commit 1cf5b6de7c
9 changed files with 302 additions and 39 deletions

View File

@ -148,7 +148,8 @@ def add_acls_for_drop_port_group(pg_name):
"name": [],
"severity": [],
"direction": direction,
"match": f'{p} == @{pg_name} && ip'}
"match": f'{p} == @{pg_name} && ip',
"meter": []}
acl_list.append(acl)
return acl_list
@ -167,6 +168,7 @@ def drop_all_ip_traffic_for_port(port):
"severity": [],
"direction": direction,
"match": '{} == "{}" && ip'.format(p, port['id']),
"meter": [],
"external_ids": {'neutron:lport': port['id']}}
acl_list.append(acl)
return acl_list
@ -187,6 +189,7 @@ def add_sg_rule_acl_for_port_group(port_group, r, stateful, match):
"severity": [],
"direction": dir_map[r['direction']],
"match": match,
"meter": [],
ovn_const.OVN_SG_RULE_EXT_ID_KEY: r['id']}
return acl

View File

@ -259,7 +259,7 @@ INITIAL_REV_NUM = -1
ACL_EXPECTED_COLUMNS_NBDB = (
'external_ids', 'direction', 'log', 'priority',
'name', 'action', 'severity', 'match')
'name', 'action', 'severity', 'match', 'meter')
# Resource types
TYPE_NETWORKS = 'networks'

View File

@ -35,6 +35,7 @@ from neutron.objects.port_forwarding import PortForwarding
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb.extensions import qos \
as ovn_qos
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.services.logapi.drivers.ovn import driver as log_driver
from neutron.services.segments import db as segments_db
@ -82,6 +83,18 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
self.segments_plugin = (
manager.NeutronManager.load_class_for_provider(
'neutron.service_plugins', 'segments')())
self.log_plugin = directory.get_plugin(plugin_constants.LOG_API)
if not self.log_plugin:
self.log_plugin = (
manager.NeutronManager.load_class_for_provider(
'neutron.service_plugins', 'log')())
directory.add_plugin(plugin_constants.LOG_API, self.log_plugin)
for driver in self.log_plugin.driver_manager.drivers:
if driver.name == "ovn":
self.ovn_log_driver = driver
if not hasattr(self, 'ovn_log_driver'):
self.ovn_log_driver = log_driver.OVNDriver()
self.log_plugin.driver_manager.register_driver(self.ovn_log_driver)
def stop(self):
if utils.is_ovn_l3(self.l3_plugin):
@ -233,6 +246,9 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
def _get_acls_from_port_groups(self):
ovn_acls = []
# Options and label columns are only present for OVN >= 22.03.
# Furthermore label is a randint so it cannot be compared with any
# expected neutron value. They are added later on the ACL addition.
acl_columns = (self.ovn_api._tables['ACL'].columns.keys() &
set(ovn_const.ACL_EXPECTED_COLUMNS_NBDB))
acl_columns.discard('external_ids')
@ -244,7 +260,16 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
acl_string['port_group'] = pg.name
if id_key in acl.external_ids:
acl_string[id_key] = acl.external_ids[id_key]
# This properties are present as lists of one item,
# converting them to string.
if acl_string['name']:
acl_string['name'] = acl_string['name'][0]
if acl_string['meter']:
acl_string['meter'] = acl_string['meter'][0]
if acl_string['severity']:
acl_string['severity'] = acl_string['severity'][0]
ovn_acls.append(acl_string)
return ovn_acls
def sync_acls(self, ctx):
@ -274,6 +299,10 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
neutron_default_acls = acl_utils.add_acls_for_drop_port_group(
ovn_const.OVN_DROP_PORT_GROUP_NAME)
# Add logging options
self.ovn_log_driver.add_logging_options_to_acls(neutron_acls, ctx)
self.ovn_log_driver.add_logging_options_to_acls(neutron_default_acls,
ctx)
ovn_acls = self._get_acls_from_port_groups()
# Sort the acls in the ovn database according to the security
# group rule id for easy comparison in the future.
@ -304,14 +333,24 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
o_index += 1
elif n_id == o_id:
if any(item not in na.items() for item in oa.items()):
for item in oa.items():
if item not in na.items():
LOG.warning('Property %(item)s from OVN ACL not '
'found in Neutron ACL: %(n_acl)s',
{'item': item,
'n_acl': na})
add_acls.append(na)
remove_acls.append(oa)
n_index += 1
o_index += 1
elif n_id > o_id:
LOG.warning('ACL should not be present in OVN, removing'
'%(acl)s', {'acl': oa})
remove_acls.append(oa)
o_index += 1
else:
LOG.warning('ACL should be present in OVN but is not, adding:'
'%(acl)s', {'acl': na})
add_acls.append(na)
n_index += 1
@ -351,32 +390,6 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
if (self.mode == ovn_const.OVN_DB_SYNC_MODE_REPAIR and
(num_acls_to_add or num_acls_to_remove)):
one_time_pg_resync = True
while True:
try:
with self.ovn_api.transaction(check_error=True) as txn:
for acla in neutron_acls:
LOG.warning('ACL found in Neutron but not in '
'OVN NB DB for port group %s',
acla['port_group'])
txn.add(self.ovn_api.pg_acl_add(
**acla, may_exist=True))
except idlutils.RowNotFound as row_err:
if row_err.msg.startswith("Cannot find Port_Group"):
if one_time_pg_resync:
LOG.warning('Port group row was not found during '
'ACLs sync. Will attempt to sync port '
'groups one more time. The caught '
'exception is: %s', row_err)
self.sync_port_groups(ctx)
one_time_pg_resync = False
continue
LOG.error('Port group exception during ACL sync '
'even after one more port group resync. '
'The caught exception is: %s', row_err)
else:
raise
break
with self.ovn_api.transaction(check_error=True) as txn:
for aclr in ovn_acls:
LOG.warning('ACLs found in OVN NB DB but not in '
@ -393,6 +406,41 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
LOG.warning('Removing ACLs from OVN from Logical '
'Switch %s', aclr[0])
txn.add(self.ovn_api.acl_del(aclr[0]))
while True:
try:
with self.ovn_api.transaction(check_error=True) as txn:
for acla in neutron_acls:
LOG.warning('ACL found in Neutron but not in '
'OVN NB DB for port group %s',
acla['port_group'])
acl = txn.add(self.ovn_api.pg_acl_add(**acla,
may_exist=True))
# We need to do this now since label should be
# random and not 0. We can use options as a way
# to see if label is supported or not.
if acla.get('log'):
self.ovn_log_driver.add_label_related(acla,
ctx)
txn.add(self.ovn_api.db_set('ACL', acl,
label=acla['label'],
options=acla['options']))
except idlutils.RowNotFound as row_err:
if row_err.msg.startswith("Cannot find Port_Group"):
if one_time_pg_resync:
LOG.warning('Port group row was not found during '
'ACLs sync. Will attempt to sync port '
'groups one more time. The caught '
'exception is: %s', row_err)
self.sync_port_groups(ctx)
one_time_pg_resync = False
continue
LOG.error('Port group exception during ACL sync '
'even after one more port group resync. '
'The caught exception is: %s', row_err)
else:
raise
break
LOG.debug('OVN-NB Sync ACLs completed @ %s', str(datetime.now()))

View File

@ -409,6 +409,80 @@ class OVNDriver(base.DriverBase):
with self.ovn_nb.transaction(check_error=True) as ovn_txn:
self._update_log_objs(context, ovn_txn, log_objs)
def add_logging_options_to_acls(self, neutron_acls, context):
log_objs = self._get_logs(context)
for log_obj in log_objs:
pgs = self._pgs_from_log_obj(context, log_obj)
actions_enabled = self._acl_actions_enabled(log_obj)
self._set_neutron_acls_log(pgs, context, actions_enabled,
utils.ovn_name(log_obj.id),
neutron_acls)
# This function is a version of set_acls_log meant to change neutron
# defined acls, mostly thought for ovndbsync consistency check.
def _set_neutron_acls_log(self, pgs, context, actions_enabled, log_name,
neutron_acls):
acl_changes, acl_visits = 0, 0
for pg in pgs:
meter_name = self.meter_name
if pg['name'] != ovn_const.OVN_DROP_PORT_GROUP_NAME:
sg = sg_obj.SecurityGroup.get_sg_by_id(context,
pg['external_ids'][ovn_const.OVN_SG_EXT_ID_KEY])
if not sg:
LOG.warning("Port Group %s is missing a corresponding "
"security group, skipping its network log "
"setting...", pg["name"])
continue
if not sg.stateful:
meter_name = meter_name + ("_stateless")
# We need to get the OVN ACL because UUID is not listed as a
# property on neutron defined ACLs (and it shouldn't), so we need
# to check which ACL is that UUID referring to, using match as
# differentiating value.
for acl in neutron_acls:
acl_visits += 1
# skip acls used by a different network log
n_acl_name = acl['name']
if n_acl_name and n_acl_name != log_name:
continue
action = acl['action'] in actions_enabled
acl['log'] = action
acl['meter'] = meter_name
acl['name'] = log_name
acl['severity'] = "info"
if acl.get('options'):
acl["options"] = {'log-related': "true"}
# label is not set because the actual number should not
# be compared or taken into account, we only need it to be
# different from 0.
acl_changes += 1
LOG.info("Set %d (out of %d visited) Neutron ACLs for network log %s",
acl_changes, acl_visits, log_name)
def _get_all_log_pgs(self, ctx):
"""Get all Port Group names associated to a Log Object.
:param log_plugin: Currently loaded log_plugging.
:param ctx: current running context information
"""
log_objs = self._get_logs(ctx)
log_pgs = []
for log_obj in log_objs:
log_pgs.extend(self._pgs_from_log_obj(ctx, log_obj))
return log_pgs
def add_label_related(self, n_acl, ctx):
# Get acls to be able to check if label is present in OVN ACLs and
# also check old label value for ACL if it was already present.
acls = [acl for pg in self._get_all_log_pgs(ctx) for acl in pg["acls"]]
if not acls:
return
acl = self.ovn_nb.lookup("ACL", acls[0], default=None)
if not hasattr(acl, 'label'):
return
n_acl["label"] = secrets.SystemRandom().randrange(1, MAX_INT_LABEL)
n_acl["options"] = {'log-related': 'true'}
def register(plugin_driver):
"""Register the driver."""

View File

@ -50,6 +50,7 @@ from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb.extensions import \
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
from neutron.plugins.ml2.drivers import type_geneve # noqa
from neutron import service # noqa
from neutron.services.logapi.drivers.ovn import driver as log_driver
from neutron.tests import base
from neutron.tests.common import base as common_base
from neutron.tests.common import helpers
@ -175,6 +176,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
self.addCleanup(exts.PluginAwareExtensionManager.clear_instance)
self.ovsdb_server_mgr = None
self._service_plugins = service_plugins
log_driver.DRIVER = None
super().setUp()
self.test_log_dir = os.path.join(DEFAULT_LOG_DIR, self.id())
base.setup_test_logging(
@ -197,6 +199,11 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
self.mech_driver.log_driver)
self.mech_driver.log_driver.plugin_driver = self.mech_driver
self.mech_driver.log_driver._log_plugin_property = None
for driver in self.log_plugin.driver_manager.drivers:
if driver.name == "ovn":
self.ovn_log_driver = driver
if not hasattr(self, 'ovn_log_driver'):
self.ovn_log_driver = log_driver.OVNDriver()
self.ovn_northd_mgr = None
self.maintenance_worker = maintenance_worker
mock.patch(

View File

@ -24,6 +24,7 @@ from neutron_lib.api.definitions import port_security as ps
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib.services.logapi import constants as log_const
from neutron_lib.services.qos import constants as qos_const
from oslo_config import cfg
from oslo_utils import uuidutils
@ -107,7 +108,7 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
nb_idl=self.nb_api)
def get_additional_service_plugins(self):
return {'qos': 'qos', 'segments': 'segments'}
return {'qos': 'qos', 'segments': 'segments', 'log': 'log'}
def _api_for_resource(self, resource):
if resource in ['security-groups']:
@ -1080,7 +1081,7 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
@staticmethod
def _build_acl_for_pgs(priority, direction, log, name, action,
severity, match, port_group, **kwargs):
severity, match, meter, port_group, **kwargs):
return {
'priority': priority,
'direction': direction,
@ -1089,6 +1090,7 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
'action': action,
'severity': severity,
'match': match,
'meter': meter,
'external_ids': kwargs}
def _validate_dhcp_opts(self, should_match=True):
@ -1161,6 +1163,9 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
ovn_const.OVN_DROP_PORT_GROUP_NAME):
db_acls.append(TestOvnNbSync._build_acl_for_pgs(**acl))
self.ovn_log_driver.add_logging_options_to_acls(db_acls,
self.ctx)
# Get the list of ACLs stored in the OVN plugin IDL.
plugin_acls = []
for row in _plugin_nb_ovn._tables['Logical_Switch'].rows.values():
@ -1181,7 +1186,24 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
for acl in getattr(row, 'acls', []):
monitor_acls.append(self._build_acl_to_compare(acl))
self.ovn_log_driver.add_logging_options_to_acls(monitor_acls,
self.ctx)
self.ovn_log_driver.add_logging_options_to_acls(plugin_acls,
self.ctx)
# Values taken out from list for comparison, since ACLs from OVN DB
# have certain values on a list of just one object
if should_match:
for acl in plugin_acls:
if isinstance(acl['severity'], list) and acl['severity']:
acl['severity'] = acl['severity'][0]
acl['name'] = acl['name'][0]
acl['meter'] = acl['meter'][0]
for acl in monitor_acls:
if isinstance(acl['severity'], list) and acl['severity']:
acl['severity'] = acl['severity'][0]
acl['name'] = acl['name'][0]
acl['meter'] = acl['meter'][0]
self.assertCountEqual(db_acls, plugin_acls)
self.assertCountEqual(db_acls, monitor_acls)
else:
@ -1777,13 +1799,25 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
self.assertIn('security_group_rule', sgr)
return sgr['security_group_rule']['id']
def test_sync_acls(self):
def _test_sync_acls_helper(self, test_log=False,
log_event=log_const.ALL_EVENT):
data = {'security_group': {'name': 'sg1'}}
sg_req = self.new_create_request('security-groups', data)
res = sg_req.get_response(self.api)
sg = self.deserialize(self.fmt, res)['security_group']
sgr_ids = []
# If we are going to test ACLs with log enabled, set up a log object
if test_log:
log_data = {'log': {'project_id': self.ctx.project_id,
'resource_type': 'security_group',
'description': 'test net log',
'name': 'logme',
'enabled': True,
'event': log_event}}
log_obj = self.log_plugin.create_log(self.ctx, log_data)
for tcp_port in range(8050, 8055):
sgr_ids.append(self._create_security_group_rule(
sg['id'], 'ingress', tcp_port))
@ -1814,6 +1848,21 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
nb_synchronizer.sync_acls(ctx)
self._validate_acls()
# Remove log object to avoid overlapping
if test_log:
log_obj = self.log_plugin.delete_log(self.ctx, log_obj['id'])
def test_sync_acls(self):
self._test_sync_acls_helper()
def test_sync_acls_with_logging(self):
self._test_sync_acls_helper(test_log=True,
log_event=log_const.ACCEPT_EVENT)
self._test_sync_acls_helper(test_log=True,
log_event=log_const.ALL_EVENT)
self._test_sync_acls_helper(test_log=True,
log_event=log_const.DROP_EVENT)
class TestOvnSbSync(base.TestOVNFunctionalBase):

View File

@ -60,7 +60,8 @@ class TestACLs(base.BaseTestCase):
'lport': self.fake_port['id'],
'lswitch': 'neutron-network_id1',
'match': 'outport == "fake_port_id1" && ip',
'priority': 1001}
'priority': 1001,
'meter': []}
acl_from_lport = {'action': 'drop', 'direction': 'from-lport',
'external_ids': {'neutron:lport':
self.fake_port['id']},
@ -68,7 +69,8 @@ class TestACLs(base.BaseTestCase):
'lport': self.fake_port['id'],
'lswitch': 'neutron-network_id1',
'match': 'inport == "fake_port_id1" && ip',
'priority': 1001}
'priority': 1001,
'meter': []}
for acl in acls:
if 'to-lport' in acl.values():
self.assertEqual(acl_to_lport, acl)

View File

@ -16,6 +16,8 @@ import collections
from unittest import mock
from neutron_lib import constants as const
from neutron_lib.services.logapi import constants as log_const
from oslo_utils import uuidutils
from neutron.common.ovn import acl
from neutron.common.ovn import constants as ovn_const
@ -26,6 +28,7 @@ from neutron.services.ovn_l3 import plugin as ovn_plugin
from neutron.tests.unit import fake_resources as fakes
from neutron.tests.unit.plugins.ml2.drivers.ovn.mech_driver import \
test_mech_driver
from neutron.tests.unit.services.logapi.drivers.ovn import test_driver
OvnPortInfo = collections.namedtuple('OvnPortInfo', ['name'])
@ -39,6 +42,7 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
# We want metadata enabled to increase coverage
super().setUp(enable_metadata=True)
self.test_log_driver = test_driver.TestOVNDriver()
self.subnet = {'cidr': '10.0.0.0/24',
'id': 'subnet1',
'subnetpool_id': None,
@ -227,7 +231,10 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
'lswitch': 'lswitch1', 'lport': 'lport1'}],
'lport2':
[{'id': 'acl2', 'priority': 00, 'policy': 'drop',
'lswitch': 'lswitch2', 'lport': 'lport2'}],
'lswitch': 'lswitch2', 'lport': 'lport2'},
{'id': 'aclr3', 'priority': 00, 'log': True,
'policy': 'drop', 'lswitch': 'lswitch2',
'meter': 'acl_log_meter', 'label': 1, 'lport': 'lport2'}],
# ACLs need to be kept as-is by the sync tool
'p2n2':
[{'lport': 'p2n2', 'direction': 'to-lport',
@ -384,7 +391,7 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
return {'r1': ['172.16.0.0/24', '172.16.2.0/24'],
'r2': ['192.168.2.0/24']}.get(router_id, [])
def _test_mocks_helper(self, ovn_nb_synchronizer):
def _test_mocks_helper(self, ovn_nb_synchronizer, test_logging=False):
core_plugin = ovn_nb_synchronizer.core_plugin
ovn_api = ovn_nb_synchronizer.ovn_api
ovn_driver = ovn_nb_synchronizer.ovn_driver
@ -429,12 +436,25 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
# 4 acls are returned as current ovn acls,
# two of which will match with neutron.
# So, in this example 17 will be added, 2 removed
core_plugin.get_ports = mock.Mock()
core_plugin.get_ports.side_effect = get_ports()
mock.patch.object(acl, '_get_subnet_from_cache',
return_value=self.subnet).start()
mock.patch.object(acl, 'acl_remote_group_id',
side_effect=self.matches).start()
if test_logging:
log_objs = [self.test_log_driver._fake_log_obj(
event=log_const.DROP_EVENT, resource_id=None, id='1111')]
mock.patch.object(ovn_nb_synchronizer.ovn_log_driver, '_get_logs',
return_value=log_objs).start()
mock.patch.object(ovn_nb_synchronizer.ovn_log_driver,
'_pgs_from_log_obj', return_value=[
{'name': 'neutron_pg_drop',
'external_ids': {},
'acls': [uuidutils.generate_uuid()]}]
).start()
core_plugin.get_security_group = mock.MagicMock(
side_effect=self.security_groups)
ovn_nb_synchronizer.get_acls = mock.Mock()
@ -580,8 +600,9 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
delete_dhcp_options_list,
add_port_groups_list,
del_port_groups_list,
create_metadata_list):
self._test_mocks_helper(ovn_nb_synchronizer)
create_metadata_list,
test_logging=False):
self._test_mocks_helper(ovn_nb_synchronizer, test_logging)
ovn_api = ovn_nb_synchronizer.ovn_api
mock.patch.object(impl_idl_ovn.OvsdbNbOvnIdl, 'from_worker').start()
@ -764,7 +785,14 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
ovn_api.delete_dhcp_options.assert_has_calls(
delete_dhcp_options_calls, any_order=True)
def test_ovn_nb_sync_mode_repair(self):
if test_logging:
# 2 times when doing add_logging_options_to_acls and then
# 2 times because of the add_label_related used 2 times for the
# from-port and to-port drop acls
self.assertEqual(4, ovn_nb_synchronizer.ovn_log_driver.
_pgs_from_log_obj.call_count)
def _test_ovn_nb_sync_mode_repair(self, test_logging=False):
create_network_list = [{'net': {'id': 'n2', 'mtu': 1450},
'ext_ids': {}}]
@ -892,7 +920,14 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
delete_dhcp_options_list,
add_port_groups_list,
del_port_groups_list,
create_metadata_list)
create_metadata_list,
test_logging)
def test_ovn_nb_sync_mode_repair(self):
self._test_ovn_nb_sync_mode_repair(test_logging=False)
def test_ovn_nb_sync_mode_repair_logs_created(self):
self._test_ovn_nb_sync_mode_repair(test_logging=True)
def test_ovn_nb_sync_mode_log(self):
create_network_list = []

View File

@ -328,3 +328,48 @@ class TestOVNDriver(TestOVNDriverBase):
self.assertEqual(len(pg_dict["acls"]), info_args[2])
self.assertEqual(log_name, info_args[3])
self.assertEqual(1, self._nb_ovn.db_set.call_count)
def test_add_label_related(self):
mock.patch.object(self._log_driver, '_pgs_from_log_obj', return_value=[
{'name': 'neutron_pg_drop',
'external_ids': {},
'acls': [uuidutils.generate_uuid()]}]).start()
neutron_acl = {'port_group': 'neutron_pg_drop',
'priority': 1001,
'action': 'drop',
'log': True,
'name': '',
'severity': 'info',
'direction': 'to-lport',
'match': 'outport == @neutron_pg_drop && ip'}
log_objs = [self._fake_log_obj(event=log_const.DROP_EVENT)]
with mock.patch.object(self._log_driver, '_get_logs',
return_value=log_objs):
self._log_driver.add_label_related(neutron_acl, self.context)
self.assertNotEqual(neutron_acl['label'], 0)
def test_add_logging_options_to_acls(self):
mock.patch.object(self._log_driver, '_pgs_from_log_obj', return_value=[
{'name': 'neutron_pg_drop', 'external_ids': {},
'acls': [uuidutils.generate_uuid()]}]).start()
n_acls = [{'port_group': 'neutron_pg_drop',
'priority': 1001,
'action': 'drop',
'log': False,
'name': '',
'severity': '',
'direction': 'to-lport',
'match': 'outport == @neutron_pg_drop && ip'}]
log_objs = [self._fake_log_obj(event=log_const.DROP_EVENT,
resource_id=None,
id='1111')]
with mock.patch.object(self._log_driver, '_get_logs',
return_value=log_objs):
self._log_driver.add_logging_options_to_acls(n_acls, self.context)
for acl in n_acls:
self.assertEqual(acl['severity'], 'info')
self.assertTrue(acl['log'])
self.assertEqual(acl['name'],
ovn_utils.ovn_name(log_objs[0].id))
self.assertEqual(acl['meter'], self._log_driver.meter_name)