Merge "Consider logging options when using OVNdbsync"

This commit is contained in:
Zuul
2025-06-13 11:12:43 +00:00
committed by Gerrit Code Review
9 changed files with 302 additions and 39 deletions

View File

@ -148,7 +148,8 @@ def add_acls_for_drop_port_group(pg_name):
"name": [],
"severity": [],
"direction": direction,
"match": f'{p} == @{pg_name} && ip'}
"match": f'{p} == @{pg_name} && ip',
"meter": []}
acl_list.append(acl)
return acl_list
@ -167,6 +168,7 @@ def drop_all_ip_traffic_for_port(port):
"severity": [],
"direction": direction,
"match": '{} == "{}" && ip'.format(p, port['id']),
"meter": [],
"external_ids": {'neutron:lport': port['id']}}
acl_list.append(acl)
return acl_list
@ -187,6 +189,7 @@ def add_sg_rule_acl_for_port_group(port_group, r, stateful, match):
"severity": [],
"direction": dir_map[r['direction']],
"match": match,
"meter": [],
ovn_const.OVN_SG_RULE_EXT_ID_KEY: r['id']}
return acl

View File

@ -259,7 +259,7 @@ INITIAL_REV_NUM = -1
ACL_EXPECTED_COLUMNS_NBDB = (
'external_ids', 'direction', 'log', 'priority',
'name', 'action', 'severity', 'match')
'name', 'action', 'severity', 'match', 'meter')
# Resource types
TYPE_NETWORKS = 'networks'

View File

@ -35,6 +35,7 @@ from neutron.objects.port_forwarding import PortForwarding
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb.extensions import qos \
as ovn_qos
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.services.logapi.drivers.ovn import driver as log_driver
from neutron.services.segments import db as segments_db
@ -82,6 +83,18 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
self.segments_plugin = (
manager.NeutronManager.load_class_for_provider(
'neutron.service_plugins', 'segments')())
self.log_plugin = directory.get_plugin(plugin_constants.LOG_API)
if not self.log_plugin:
self.log_plugin = (
manager.NeutronManager.load_class_for_provider(
'neutron.service_plugins', 'log')())
directory.add_plugin(plugin_constants.LOG_API, self.log_plugin)
for driver in self.log_plugin.driver_manager.drivers:
if driver.name == "ovn":
self.ovn_log_driver = driver
if not hasattr(self, 'ovn_log_driver'):
self.ovn_log_driver = log_driver.OVNDriver()
self.log_plugin.driver_manager.register_driver(self.ovn_log_driver)
def stop(self):
if utils.is_ovn_l3(self.l3_plugin):
@ -233,6 +246,9 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
def _get_acls_from_port_groups(self):
ovn_acls = []
# Options and label columns are only present for OVN >= 22.03.
# Furthermore label is a randint so it cannot be compared with any
# expected neutron value. They are added later on the ACL addition.
acl_columns = (self.ovn_api._tables['ACL'].columns.keys() &
set(ovn_const.ACL_EXPECTED_COLUMNS_NBDB))
acl_columns.discard('external_ids')
@ -244,7 +260,16 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
acl_string['port_group'] = pg.name
if id_key in acl.external_ids:
acl_string[id_key] = acl.external_ids[id_key]
# This properties are present as lists of one item,
# converting them to string.
if acl_string['name']:
acl_string['name'] = acl_string['name'][0]
if acl_string['meter']:
acl_string['meter'] = acl_string['meter'][0]
if acl_string['severity']:
acl_string['severity'] = acl_string['severity'][0]
ovn_acls.append(acl_string)
return ovn_acls
def sync_acls(self, ctx):
@ -274,6 +299,10 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
neutron_default_acls = acl_utils.add_acls_for_drop_port_group(
ovn_const.OVN_DROP_PORT_GROUP_NAME)
# Add logging options
self.ovn_log_driver.add_logging_options_to_acls(neutron_acls, ctx)
self.ovn_log_driver.add_logging_options_to_acls(neutron_default_acls,
ctx)
ovn_acls = self._get_acls_from_port_groups()
# Sort the acls in the ovn database according to the security
# group rule id for easy comparison in the future.
@ -304,14 +333,24 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
o_index += 1
elif n_id == o_id:
if any(item not in na.items() for item in oa.items()):
for item in oa.items():
if item not in na.items():
LOG.warning('Property %(item)s from OVN ACL not '
'found in Neutron ACL: %(n_acl)s',
{'item': item,
'n_acl': na})
add_acls.append(na)
remove_acls.append(oa)
n_index += 1
o_index += 1
elif n_id > o_id:
LOG.warning('ACL should not be present in OVN, removing'
'%(acl)s', {'acl': oa})
remove_acls.append(oa)
o_index += 1
else:
LOG.warning('ACL should be present in OVN but is not, adding:'
'%(acl)s', {'acl': na})
add_acls.append(na)
n_index += 1
@ -351,32 +390,6 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
if (self.mode == ovn_const.OVN_DB_SYNC_MODE_REPAIR and
(num_acls_to_add or num_acls_to_remove)):
one_time_pg_resync = True
while True:
try:
with self.ovn_api.transaction(check_error=True) as txn:
for acla in neutron_acls:
LOG.warning('ACL found in Neutron but not in '
'OVN NB DB for port group %s',
acla['port_group'])
txn.add(self.ovn_api.pg_acl_add(
**acla, may_exist=True))
except idlutils.RowNotFound as row_err:
if row_err.msg.startswith("Cannot find Port_Group"):
if one_time_pg_resync:
LOG.warning('Port group row was not found during '
'ACLs sync. Will attempt to sync port '
'groups one more time. The caught '
'exception is: %s', row_err)
self.sync_port_groups(ctx)
one_time_pg_resync = False
continue
LOG.error('Port group exception during ACL sync '
'even after one more port group resync. '
'The caught exception is: %s', row_err)
else:
raise
break
with self.ovn_api.transaction(check_error=True) as txn:
for aclr in ovn_acls:
LOG.warning('ACLs found in OVN NB DB but not in '
@ -393,6 +406,41 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
LOG.warning('Removing ACLs from OVN from Logical '
'Switch %s', aclr[0])
txn.add(self.ovn_api.acl_del(aclr[0]))
while True:
try:
with self.ovn_api.transaction(check_error=True) as txn:
for acla in neutron_acls:
LOG.warning('ACL found in Neutron but not in '
'OVN NB DB for port group %s',
acla['port_group'])
acl = txn.add(self.ovn_api.pg_acl_add(**acla,
may_exist=True))
# We need to do this now since label should be
# random and not 0. We can use options as a way
# to see if label is supported or not.
if acla.get('log'):
self.ovn_log_driver.add_label_related(acla,
ctx)
txn.add(self.ovn_api.db_set('ACL', acl,
label=acla['label'],
options=acla['options']))
except idlutils.RowNotFound as row_err:
if row_err.msg.startswith("Cannot find Port_Group"):
if one_time_pg_resync:
LOG.warning('Port group row was not found during '
'ACLs sync. Will attempt to sync port '
'groups one more time. The caught '
'exception is: %s', row_err)
self.sync_port_groups(ctx)
one_time_pg_resync = False
continue
LOG.error('Port group exception during ACL sync '
'even after one more port group resync. '
'The caught exception is: %s', row_err)
else:
raise
break
LOG.debug('OVN-NB Sync ACLs completed @ %s', str(datetime.now()))

View File

@ -409,6 +409,80 @@ class OVNDriver(base.DriverBase):
with self.ovn_nb.transaction(check_error=True) as ovn_txn:
self._update_log_objs(context, ovn_txn, log_objs)
def add_logging_options_to_acls(self, neutron_acls, context):
log_objs = self._get_logs(context)
for log_obj in log_objs:
pgs = self._pgs_from_log_obj(context, log_obj)
actions_enabled = self._acl_actions_enabled(log_obj)
self._set_neutron_acls_log(pgs, context, actions_enabled,
utils.ovn_name(log_obj.id),
neutron_acls)
# This function is a version of set_acls_log meant to change neutron
# defined acls, mostly thought for ovndbsync consistency check.
def _set_neutron_acls_log(self, pgs, context, actions_enabled, log_name,
neutron_acls):
acl_changes, acl_visits = 0, 0
for pg in pgs:
meter_name = self.meter_name
if pg['name'] != ovn_const.OVN_DROP_PORT_GROUP_NAME:
sg = sg_obj.SecurityGroup.get_sg_by_id(context,
pg['external_ids'][ovn_const.OVN_SG_EXT_ID_KEY])
if not sg:
LOG.warning("Port Group %s is missing a corresponding "
"security group, skipping its network log "
"setting...", pg["name"])
continue
if not sg.stateful:
meter_name = meter_name + ("_stateless")
# We need to get the OVN ACL because UUID is not listed as a
# property on neutron defined ACLs (and it shouldn't), so we need
# to check which ACL is that UUID referring to, using match as
# differentiating value.
for acl in neutron_acls:
acl_visits += 1
# skip acls used by a different network log
n_acl_name = acl['name']
if n_acl_name and n_acl_name != log_name:
continue
action = acl['action'] in actions_enabled
acl['log'] = action
acl['meter'] = meter_name
acl['name'] = log_name
acl['severity'] = "info"
if acl.get('options'):
acl["options"] = {'log-related': "true"}
# label is not set because the actual number should not
# be compared or taken into account, we only need it to be
# different from 0.
acl_changes += 1
LOG.info("Set %d (out of %d visited) Neutron ACLs for network log %s",
acl_changes, acl_visits, log_name)
def _get_all_log_pgs(self, ctx):
"""Get all Port Group names associated to a Log Object.
:param log_plugin: Currently loaded log_plugging.
:param ctx: current running context information
"""
log_objs = self._get_logs(ctx)
log_pgs = []
for log_obj in log_objs:
log_pgs.extend(self._pgs_from_log_obj(ctx, log_obj))
return log_pgs
def add_label_related(self, n_acl, ctx):
# Get acls to be able to check if label is present in OVN ACLs and
# also check old label value for ACL if it was already present.
acls = [acl for pg in self._get_all_log_pgs(ctx) for acl in pg["acls"]]
if not acls:
return
acl = self.ovn_nb.lookup("ACL", acls[0], default=None)
if not hasattr(acl, 'label'):
return
n_acl["label"] = secrets.SystemRandom().randrange(1, MAX_INT_LABEL)
n_acl["options"] = {'log-related': 'true'}
def register(plugin_driver):
"""Register the driver."""

View File

@ -50,6 +50,7 @@ from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb.extensions import \
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
from neutron.plugins.ml2.drivers import type_geneve # noqa
from neutron import service # noqa
from neutron.services.logapi.drivers.ovn import driver as log_driver
from neutron.tests import base
from neutron.tests.common import base as common_base
from neutron.tests.common import helpers
@ -175,6 +176,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
self.addCleanup(exts.PluginAwareExtensionManager.clear_instance)
self.ovsdb_server_mgr = None
self._service_plugins = service_plugins
log_driver.DRIVER = None
super().setUp()
self.test_log_dir = os.path.join(DEFAULT_LOG_DIR, self.id())
base.setup_test_logging(
@ -197,6 +199,11 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
self.mech_driver.log_driver)
self.mech_driver.log_driver.plugin_driver = self.mech_driver
self.mech_driver.log_driver._log_plugin_property = None
for driver in self.log_plugin.driver_manager.drivers:
if driver.name == "ovn":
self.ovn_log_driver = driver
if not hasattr(self, 'ovn_log_driver'):
self.ovn_log_driver = log_driver.OVNDriver()
self.ovn_northd_mgr = None
self.maintenance_worker = maintenance_worker
mock.patch(

View File

@ -24,6 +24,7 @@ from neutron_lib.api.definitions import port_security as ps
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib.services.logapi import constants as log_const
from neutron_lib.services.qos import constants as qos_const
from oslo_config import cfg
from oslo_utils import uuidutils
@ -107,7 +108,7 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
nb_idl=self.nb_api)
def get_additional_service_plugins(self):
return {'qos': 'qos', 'segments': 'segments'}
return {'qos': 'qos', 'segments': 'segments', 'log': 'log'}
def _api_for_resource(self, resource):
if resource in ['security-groups']:
@ -1080,7 +1081,7 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
@staticmethod
def _build_acl_for_pgs(priority, direction, log, name, action,
severity, match, port_group, **kwargs):
severity, match, meter, port_group, **kwargs):
return {
'priority': priority,
'direction': direction,
@ -1089,6 +1090,7 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
'action': action,
'severity': severity,
'match': match,
'meter': meter,
'external_ids': kwargs}
def _validate_dhcp_opts(self, should_match=True):
@ -1161,6 +1163,9 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
ovn_const.OVN_DROP_PORT_GROUP_NAME):
db_acls.append(TestOvnNbSync._build_acl_for_pgs(**acl))
self.ovn_log_driver.add_logging_options_to_acls(db_acls,
self.ctx)
# Get the list of ACLs stored in the OVN plugin IDL.
plugin_acls = []
for row in _plugin_nb_ovn._tables['Logical_Switch'].rows.values():
@ -1181,7 +1186,24 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
for acl in getattr(row, 'acls', []):
monitor_acls.append(self._build_acl_to_compare(acl))
self.ovn_log_driver.add_logging_options_to_acls(monitor_acls,
self.ctx)
self.ovn_log_driver.add_logging_options_to_acls(plugin_acls,
self.ctx)
# Values taken out from list for comparison, since ACLs from OVN DB
# have certain values on a list of just one object
if should_match:
for acl in plugin_acls:
if isinstance(acl['severity'], list) and acl['severity']:
acl['severity'] = acl['severity'][0]
acl['name'] = acl['name'][0]
acl['meter'] = acl['meter'][0]
for acl in monitor_acls:
if isinstance(acl['severity'], list) and acl['severity']:
acl['severity'] = acl['severity'][0]
acl['name'] = acl['name'][0]
acl['meter'] = acl['meter'][0]
self.assertCountEqual(db_acls, plugin_acls)
self.assertCountEqual(db_acls, monitor_acls)
else:
@ -1777,13 +1799,25 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
self.assertIn('security_group_rule', sgr)
return sgr['security_group_rule']['id']
def test_sync_acls(self):
def _test_sync_acls_helper(self, test_log=False,
log_event=log_const.ALL_EVENT):
data = {'security_group': {'name': 'sg1'}}
sg_req = self.new_create_request('security-groups', data)
res = sg_req.get_response(self.api)
sg = self.deserialize(self.fmt, res)['security_group']
sgr_ids = []
# If we are going to test ACLs with log enabled, set up a log object
if test_log:
log_data = {'log': {'project_id': self.ctx.project_id,
'resource_type': 'security_group',
'description': 'test net log',
'name': 'logme',
'enabled': True,
'event': log_event}}
log_obj = self.log_plugin.create_log(self.ctx, log_data)
for tcp_port in range(8050, 8055):
sgr_ids.append(self._create_security_group_rule(
sg['id'], 'ingress', tcp_port))
@ -1814,6 +1848,21 @@ class TestOvnNbSync(testlib_api.MySQLTestCaseMixin,
nb_synchronizer.sync_acls(ctx)
self._validate_acls()
# Remove log object to avoid overlapping
if test_log:
log_obj = self.log_plugin.delete_log(self.ctx, log_obj['id'])
def test_sync_acls(self):
self._test_sync_acls_helper()
def test_sync_acls_with_logging(self):
self._test_sync_acls_helper(test_log=True,
log_event=log_const.ACCEPT_EVENT)
self._test_sync_acls_helper(test_log=True,
log_event=log_const.ALL_EVENT)
self._test_sync_acls_helper(test_log=True,
log_event=log_const.DROP_EVENT)
class TestOvnSbSync(base.TestOVNFunctionalBase):

View File

@ -60,7 +60,8 @@ class TestACLs(base.BaseTestCase):
'lport': self.fake_port['id'],
'lswitch': 'neutron-network_id1',
'match': 'outport == "fake_port_id1" && ip',
'priority': 1001}
'priority': 1001,
'meter': []}
acl_from_lport = {'action': 'drop', 'direction': 'from-lport',
'external_ids': {'neutron:lport':
self.fake_port['id']},
@ -68,7 +69,8 @@ class TestACLs(base.BaseTestCase):
'lport': self.fake_port['id'],
'lswitch': 'neutron-network_id1',
'match': 'inport == "fake_port_id1" && ip',
'priority': 1001}
'priority': 1001,
'meter': []}
for acl in acls:
if 'to-lport' in acl.values():
self.assertEqual(acl_to_lport, acl)

View File

@ -16,6 +16,8 @@ import collections
from unittest import mock
from neutron_lib import constants as const
from neutron_lib.services.logapi import constants as log_const
from oslo_utils import uuidutils
from neutron.common.ovn import acl
from neutron.common.ovn import constants as ovn_const
@ -26,6 +28,7 @@ from neutron.services.ovn_l3 import plugin as ovn_plugin
from neutron.tests.unit import fake_resources as fakes
from neutron.tests.unit.plugins.ml2.drivers.ovn.mech_driver import \
test_mech_driver
from neutron.tests.unit.services.logapi.drivers.ovn import test_driver
OvnPortInfo = collections.namedtuple('OvnPortInfo', ['name'])
@ -39,6 +42,7 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
# We want metadata enabled to increase coverage
super().setUp(enable_metadata=True)
self.test_log_driver = test_driver.TestOVNDriver()
self.subnet = {'cidr': '10.0.0.0/24',
'id': 'subnet1',
'subnetpool_id': None,
@ -227,7 +231,10 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
'lswitch': 'lswitch1', 'lport': 'lport1'}],
'lport2':
[{'id': 'acl2', 'priority': 00, 'policy': 'drop',
'lswitch': 'lswitch2', 'lport': 'lport2'}],
'lswitch': 'lswitch2', 'lport': 'lport2'},
{'id': 'aclr3', 'priority': 00, 'log': True,
'policy': 'drop', 'lswitch': 'lswitch2',
'meter': 'acl_log_meter', 'label': 1, 'lport': 'lport2'}],
# ACLs need to be kept as-is by the sync tool
'p2n2':
[{'lport': 'p2n2', 'direction': 'to-lport',
@ -384,7 +391,7 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
return {'r1': ['172.16.0.0/24', '172.16.2.0/24'],
'r2': ['192.168.2.0/24']}.get(router_id, [])
def _test_mocks_helper(self, ovn_nb_synchronizer):
def _test_mocks_helper(self, ovn_nb_synchronizer, test_logging=False):
core_plugin = ovn_nb_synchronizer.core_plugin
ovn_api = ovn_nb_synchronizer.ovn_api
ovn_driver = ovn_nb_synchronizer.ovn_driver
@ -429,12 +436,25 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
# 4 acls are returned as current ovn acls,
# two of which will match with neutron.
# So, in this example 17 will be added, 2 removed
core_plugin.get_ports = mock.Mock()
core_plugin.get_ports.side_effect = get_ports()
mock.patch.object(acl, '_get_subnet_from_cache',
return_value=self.subnet).start()
mock.patch.object(acl, 'acl_remote_group_id',
side_effect=self.matches).start()
if test_logging:
log_objs = [self.test_log_driver._fake_log_obj(
event=log_const.DROP_EVENT, resource_id=None, id='1111')]
mock.patch.object(ovn_nb_synchronizer.ovn_log_driver, '_get_logs',
return_value=log_objs).start()
mock.patch.object(ovn_nb_synchronizer.ovn_log_driver,
'_pgs_from_log_obj', return_value=[
{'name': 'neutron_pg_drop',
'external_ids': {},
'acls': [uuidutils.generate_uuid()]}]
).start()
core_plugin.get_security_group = mock.MagicMock(
side_effect=self.security_groups)
ovn_nb_synchronizer.get_acls = mock.Mock()
@ -580,8 +600,9 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
delete_dhcp_options_list,
add_port_groups_list,
del_port_groups_list,
create_metadata_list):
self._test_mocks_helper(ovn_nb_synchronizer)
create_metadata_list,
test_logging=False):
self._test_mocks_helper(ovn_nb_synchronizer, test_logging)
ovn_api = ovn_nb_synchronizer.ovn_api
mock.patch.object(impl_idl_ovn.OvsdbNbOvnIdl, 'from_worker').start()
@ -764,7 +785,14 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
ovn_api.delete_dhcp_options.assert_has_calls(
delete_dhcp_options_calls, any_order=True)
def test_ovn_nb_sync_mode_repair(self):
if test_logging:
# 2 times when doing add_logging_options_to_acls and then
# 2 times because of the add_label_related used 2 times for the
# from-port and to-port drop acls
self.assertEqual(4, ovn_nb_synchronizer.ovn_log_driver.
_pgs_from_log_obj.call_count)
def _test_ovn_nb_sync_mode_repair(self, test_logging=False):
create_network_list = [{'net': {'id': 'n2', 'mtu': 1450},
'ext_ids': {}}]
@ -892,7 +920,14 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
delete_dhcp_options_list,
add_port_groups_list,
del_port_groups_list,
create_metadata_list)
create_metadata_list,
test_logging)
def test_ovn_nb_sync_mode_repair(self):
self._test_ovn_nb_sync_mode_repair(test_logging=False)
def test_ovn_nb_sync_mode_repair_logs_created(self):
self._test_ovn_nb_sync_mode_repair(test_logging=True)
def test_ovn_nb_sync_mode_log(self):
create_network_list = []

View File

@ -328,3 +328,48 @@ class TestOVNDriver(TestOVNDriverBase):
self.assertEqual(len(pg_dict["acls"]), info_args[2])
self.assertEqual(log_name, info_args[3])
self.assertEqual(1, self._nb_ovn.db_set.call_count)
def test_add_label_related(self):
mock.patch.object(self._log_driver, '_pgs_from_log_obj', return_value=[
{'name': 'neutron_pg_drop',
'external_ids': {},
'acls': [uuidutils.generate_uuid()]}]).start()
neutron_acl = {'port_group': 'neutron_pg_drop',
'priority': 1001,
'action': 'drop',
'log': True,
'name': '',
'severity': 'info',
'direction': 'to-lport',
'match': 'outport == @neutron_pg_drop && ip'}
log_objs = [self._fake_log_obj(event=log_const.DROP_EVENT)]
with mock.patch.object(self._log_driver, '_get_logs',
return_value=log_objs):
self._log_driver.add_label_related(neutron_acl, self.context)
self.assertNotEqual(neutron_acl['label'], 0)
def test_add_logging_options_to_acls(self):
mock.patch.object(self._log_driver, '_pgs_from_log_obj', return_value=[
{'name': 'neutron_pg_drop', 'external_ids': {},
'acls': [uuidutils.generate_uuid()]}]).start()
n_acls = [{'port_group': 'neutron_pg_drop',
'priority': 1001,
'action': 'drop',
'log': False,
'name': '',
'severity': '',
'direction': 'to-lport',
'match': 'outport == @neutron_pg_drop && ip'}]
log_objs = [self._fake_log_obj(event=log_const.DROP_EVENT,
resource_id=None,
id='1111')]
with mock.patch.object(self._log_driver, '_get_logs',
return_value=log_objs):
self._log_driver.add_logging_options_to_acls(n_acls, self.context)
for acl in n_acls:
self.assertEqual(acl['severity'], 'info')
self.assertTrue(acl['log'])
self.assertEqual(acl['name'],
ovn_utils.ovn_name(log_objs[0].id))
self.assertEqual(acl['meter'], self._log_driver.meter_name)