ovn: Remove UpdateACLsCommand

This patch removes unused UpdateACLsCommand which was previously used by the
code using Address Sets but that has been already removed. OpenStack API
doesn't implement update operation for security group rules so this
command is not needed anymore.

Change-Id: Iea4a82a21d32a064ff5dd530e104a2de2efb46b5
Signed-off-by: Jakub Libosvar <libosvar@redhat.com>
This commit is contained in:
Jakub Libosvar 2020-06-10 15:07:19 +00:00
parent 9cbbd8de53
commit e748f3f2d8
5 changed files with 0 additions and 535 deletions

View File

@ -171,23 +171,6 @@ def drop_all_ip_traffic_for_port(port):
return acl_list
def add_sg_rule_acl_for_port(port, r, match):
dir_map = {const.INGRESS_DIRECTION: 'to-lport',
const.EGRESS_DIRECTION: 'from-lport'}
acl = {"lswitch": utils.ovn_name(port['network_id']),
"lport": port['id'],
"priority": ovn_const.ACL_PRIORITY_ALLOW,
"action": ovn_const.ACL_ACTION_ALLOW_RELATED,
"log": False,
"name": [],
"severity": [],
"direction": dir_map[r['direction']],
"match": match,
"external_ids": {'neutron:lport': port['id'],
ovn_const.OVN_SG_RULE_EXT_ID_KEY: r['id']}}
return acl
def add_sg_rule_acl_for_port_group(port_group, r, match):
dir_map = {const.INGRESS_DIRECTION: 'to-lport',
const.EGRESS_DIRECTION: 'from-lport'}

View File

@ -465,168 +465,6 @@ class DelACLCommand(command.BaseCommand):
_updatevalues_in_list(lswitch, 'acls', old_values=acls_to_del)
class UpdateACLsCommand(command.BaseCommand):
def __init__(self, api, lswitch_names, port_list, acl_new_values_dict,
need_compare=True, is_add_acl=True):
"""This command updates the acl list for the logical switches
@param lswitch_names: List of Logical Switch Names
@type lswitch_names: []
@param port_list: Iterator of List of Ports
@type port_list: []
@param acl_new_values_dict: Dictionary of acls indexed by port id
@type acl_new_values_dict: {}
@need_compare: If acl_new_values_dict needs be compared with existing
acls.
@type: Boolean.
@is_add_acl: If updating is caused by acl adding action.
@type: Boolean.
"""
super(UpdateACLsCommand, self).__init__(api)
self.lswitch_names = lswitch_names
self.port_list = port_list
self.acl_new_values_dict = acl_new_values_dict
self.need_compare = need_compare
self.is_add_acl = is_add_acl
def _acl_list_sub(self, acl_list1, acl_list2):
"""Compute the elements in acl_list1 but not in acl_list2.
If acl_list1 and acl_list2 were sets, the result of this routine
could be thought of as acl_list1 - acl_list2. Note that acl_list1
and acl_list2 cannot actually be sets as they contain dictionary
items i.e. set([{'a':1}) doesn't work.
"""
acl_diff = []
for acl in acl_list1:
if acl not in acl_list2:
acl_diff.append(acl)
return acl_diff
def _compute_acl_differences(self, port_list, acl_old_values_dict,
acl_new_values_dict, acl_obj_dict):
"""Compute the difference between the new and old sets of acls
@param port_list: Iterator of a List of ports
@type port_list: []
@param acl_old_values_dict: Dictionary of old acl values indexed
by port id
@param acl_new_values_dict: Dictionary of new acl values indexed
by port id
@param acl_obj_dict: Dictionary of acl objects indexed by the acl
value in string format.
@var acl_del_objs_dict: Dictionary of acl objects to be deleted
indexed by the lswitch.
@var acl_add_values_dict: Dictionary of acl values to be added
indexed by the lswitch.
@return: (acl_del_objs_dict, acl_add_values_dict)
@rtype: ({}, {})
"""
acl_del_objs_dict = {}
acl_add_values_dict = {}
for port in port_list:
lswitch_name = port['network_id']
acls_old = acl_old_values_dict.get(port['id'], [])
acls_new = acl_new_values_dict.get(port['id'], [])
acls_del = self._acl_list_sub(acls_old, acls_new)
acls_add = self._acl_list_sub(acls_new, acls_old)
acl_del_objs = acl_del_objs_dict.setdefault(lswitch_name, [])
for acl in acls_del:
acl_del_objs.append(acl_obj_dict[str(acl)])
acl_add_values = acl_add_values_dict.setdefault(lswitch_name, [])
for acl in acls_add:
# Remove lport and lswitch columns
del acl['lswitch']
del acl['lport']
acl_add_values.append(acl)
return acl_del_objs_dict, acl_add_values_dict
def _get_update_data_without_compare(self):
lswitch_ovsdb_dict = {}
for switch_name in self.lswitch_names:
switch_name = utils.ovn_name(switch_name)
lswitch = idlutils.row_by_value(self.api.idl, 'Logical_Switch',
'name', switch_name)
lswitch_ovsdb_dict[switch_name] = lswitch
if self.is_add_acl:
acl_add_values_dict = {}
for port in self.port_list:
switch_name = utils.ovn_name(port['network_id'])
if switch_name not in acl_add_values_dict:
acl_add_values_dict[switch_name] = []
if port['id'] in self.acl_new_values_dict:
acl_add_values_dict[switch_name].append(
self.acl_new_values_dict[port['id']])
acl_del_objs_dict = {}
else:
acl_add_values_dict = {}
acl_del_objs_dict = {}
del_acl_extids = []
for acl_dict in self.acl_new_values_dict.values():
del_acl_extids.append({acl_dict['match']:
acl_dict['external_ids']})
for switch_name, lswitch in lswitch_ovsdb_dict.items():
if switch_name not in acl_del_objs_dict:
acl_del_objs_dict[switch_name] = []
acls = getattr(lswitch, 'acls', [])
for acl in acls:
match = getattr(acl, 'match')
acl_extids = {match: getattr(acl, 'external_ids')}
if acl_extids in del_acl_extids:
acl_del_objs_dict[switch_name].append(acl)
return lswitch_ovsdb_dict, acl_del_objs_dict, acl_add_values_dict
def run_idl(self, txn):
if self.need_compare:
# Get all relevant ACLs in 1 shot
acl_values_dict, acl_obj_dict, lswitch_ovsdb_dict = (
self.api.get_acls_for_lswitches(self.lswitch_names))
# Compute the difference between the new and old set of ACLs
acl_del_objs_dict, acl_add_values_dict = (
self._compute_acl_differences(
self.port_list, acl_values_dict,
self.acl_new_values_dict, acl_obj_dict))
else:
lswitch_ovsdb_dict, acl_del_objs_dict, acl_add_values_dict = (
self._get_update_data_without_compare())
for lswitch_name, lswitch in lswitch_ovsdb_dict.items():
acl_del_objs = acl_del_objs_dict.get(lswitch_name, [])
acl_add_values = acl_add_values_dict.get(lswitch_name, [])
# Continue if no ACLs to add or delete.
if not acl_del_objs and not acl_add_values:
continue
# Delete old ACLs.
if acl_del_objs:
for acl_del_obj in acl_del_objs:
try:
acl_del_obj.delete()
except AssertionError:
# If we try to delete a row twice, just continue
pass
# Add new ACLs.
acl_add_objs = None
if acl_add_values:
acl_add_objs = []
for acl_value in acl_add_values:
row = txn.insert(self.api._tables['ACL'])
for col, val in acl_value.items():
setattr(row, col, val)
acl_add_objs.append(row.uuid)
# Update logical switch ACLs.
_updatevalues_in_list(lswitch, 'acls',
new_values=acl_add_objs,
old_values=acl_del_objs)
class AddStaticRouteCommand(command.BaseCommand):
def __init__(self, api, lrouter, **columns):
super(AddStaticRouteCommand, self).__init__(api)

View File

@ -359,13 +359,6 @@ class OvsdbNbOvnIdl(nb_impl_idl.OvnNbApiIdlImpl, Backend):
def delete_acl(self, lswitch, lport, if_exists=True):
return cmd.DelACLCommand(self, lswitch, lport, if_exists)
def update_acls(self, lswitch_names, port_list, acl_new_values_dict,
need_compare=True, is_add_acl=True):
return cmd.UpdateACLsCommand(self, lswitch_names,
port_list, acl_new_values_dict,
need_compare=need_compare,
is_add_acl=is_add_acl)
def add_static_route(self, lrouter, **columns):
return cmd.AddStaticRouteCommand(self, lrouter, **columns)

View File

@ -11,17 +11,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from unittest import mock
from neutron_lib import constants as const
from ovsdbapp.backend.ovs_idl import idlutils
from neutron.common.ovn import acl as ovn_acl
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils as ovn_utils
from neutron.conf.agent import securitygroups_rpc
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import commands as cmd
from neutron.tests import base
from neutron.tests.unit import fake_resources as fakes
@ -114,239 +111,6 @@ class TestACLs(base.BaseTestCase):
if 'from-lport' in acl.values():
self.assertEqual(acl_from_lport, acl)
def _test_add_sg_rule_acl_for_port(self, sg_rule, direction, match):
port = {'id': 'port-id',
'network_id': 'network-id'}
acl = ovn_acl.add_sg_rule_acl_for_port(port, sg_rule, match)
self.assertEqual({'lswitch': 'neutron-network-id',
'lport': 'port-id',
'priority': ovn_const.ACL_PRIORITY_ALLOW,
'action': ovn_const.ACL_ACTION_ALLOW_RELATED,
'log': False, 'name': [], 'severity': [],
'direction': direction,
'match': match,
'external_ids': {
'neutron:lport': 'port-id',
'neutron:security_group_rule_id': 'sgr_id'}},
acl)
def test_add_sg_rule_acl_for_port_remote_ip_prefix(self):
sg_rule = {'id': 'sgr_id',
'direction': 'ingress',
'ethertype': 'IPv4',
'remote_group_id': None,
'remote_ip_prefix': '1.1.1.0/24',
'protocol': None}
match = 'outport == "port-id" && ip4 && ip4.src == 1.1.1.0/24'
self._test_add_sg_rule_acl_for_port(sg_rule,
'to-lport',
match)
sg_rule['direction'] = 'egress'
match = 'inport == "port-id" && ip4 && ip4.dst == 1.1.1.0/24'
self._test_add_sg_rule_acl_for_port(sg_rule,
'from-lport',
match)
def test_add_sg_rule_acl_for_port_remote_group(self):
sg_rule = {'id': 'sgr_id',
'direction': 'ingress',
'ethertype': 'IPv4',
'remote_group_id': 'sg1',
'remote_ip_prefix': None,
'protocol': None}
match = 'outport == "port-id" && ip4 && (ip4.src == 1.1.1.100' \
' || ip4.src == 1.1.1.101' \
' || ip4.src == 1.1.1.102)'
self._test_add_sg_rule_acl_for_port(sg_rule,
'to-lport',
match)
sg_rule['direction'] = 'egress'
match = 'inport == "port-id" && ip4 && (ip4.dst == 1.1.1.100' \
' || ip4.dst == 1.1.1.101' \
' || ip4.dst == 1.1.1.102)'
self._test_add_sg_rule_acl_for_port(sg_rule,
'from-lport',
match)
def test__update_acls_compute_difference(self):
lswitch_name = 'lswitch-1'
port1 = {'id': 'port-id1',
'network_id': lswitch_name,
'fixed_ips': [{'subnet_id': 'subnet-id',
'ip_address': '1.1.1.101'},
{'subnet_id': 'subnet-id-v6',
'ip_address': '2001:0db8::1:0:0:1'}]}
port2 = {'id': 'port-id2',
'network_id': lswitch_name,
'fixed_ips': [{'subnet_id': 'subnet-id',
'ip_address': '1.1.1.102'},
{'subnet_id': 'subnet-id-v6',
'ip_address': '2001:0db8::1:0:0:2'}]}
ports = [port1, port2]
# OLD ACLs, allow IPv4 communication
aclport1_old1 = {'priority': 1002, 'direction': 'from-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip4 && (ip.src == %s)' %
(port1['id'], port1['fixed_ips'][0]['ip_address'])}
aclport1_old2 = {'priority': 1002, 'direction': 'from-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip6 && (ip.src == %s)' %
(port1['id'], port1['fixed_ips'][1]['ip_address'])}
aclport1_old3 = {'priority': 1002, 'direction': 'to-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'ip4 && (ip.src == %s)' %
(port2['fixed_ips'][0]['ip_address'])}
port1_acls_old = [aclport1_old1, aclport1_old2, aclport1_old3]
aclport2_old1 = {'priority': 1002, 'direction': 'from-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip4 && (ip.src == %s)' %
(port2['id'], port2['fixed_ips'][0]['ip_address'])}
aclport2_old2 = {'priority': 1002, 'direction': 'from-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip6 && (ip.src == %s)' %
(port2['id'], port2['fixed_ips'][1]['ip_address'])}
aclport2_old3 = {'priority': 1002, 'direction': 'to-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'ip4 && (ip.src == %s)' %
(port1['fixed_ips'][0]['ip_address'])}
port2_acls_old = [aclport2_old1, aclport2_old2, aclport2_old3]
acls_old_dict = {'%s' % (port1['id']): port1_acls_old,
'%s' % (port2['id']): port2_acls_old}
acl_obj_dict = {str(aclport1_old1): 'row1',
str(aclport1_old2): 'row2',
str(aclport1_old3): 'row3',
str(aclport2_old1): 'row4',
str(aclport2_old2): 'row5',
str(aclport2_old3): 'row6'}
# NEW ACLs, allow IPv6 communication
aclport1_new1 = {'priority': 1002, 'direction': 'from-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip4 && (ip.src == %s)' %
(port1['id'], port1['fixed_ips'][0]['ip_address'])}
aclport1_new2 = {'priority': 1002, 'direction': 'from-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip6 && (ip.src == %s)' %
(port1['id'], port1['fixed_ips'][1]['ip_address'])}
aclport1_new3 = {'priority': 1002, 'direction': 'to-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'ip6 && (ip.src == %s)' %
(port2['fixed_ips'][1]['ip_address'])}
port1_acls_new = [aclport1_new1, aclport1_new2, aclport1_new3]
aclport2_new1 = {'priority': 1002, 'direction': 'from-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip4 && (ip.src == %s)' %
(port2['id'], port2['fixed_ips'][0]['ip_address'])}
aclport2_new2 = {'priority': 1002, 'direction': 'from-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip6 && (ip.src == %s)' %
(port2['id'], port2['fixed_ips'][1]['ip_address'])}
aclport2_new3 = {'priority': 1002, 'direction': 'to-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'ip6 && (ip.src == %s)' %
(port1['fixed_ips'][1]['ip_address'])}
port2_acls_new = [aclport2_new1, aclport2_new2, aclport2_new3]
acls_new_dict = {'%s' % (port1['id']): port1_acls_new,
'%s' % (port2['id']): port2_acls_new}
acls_new_dict_copy = copy.deepcopy(acls_new_dict)
# Invoke _compute_acl_differences
update_cmd = cmd.UpdateACLsCommand(self.driver._nb_ovn,
[lswitch_name],
iter(ports),
acls_new_dict
)
acl_dels, acl_adds =\
update_cmd._compute_acl_differences(iter(ports),
acls_old_dict,
acls_new_dict,
acl_obj_dict)
# Expected Difference (Sorted)
acl_del_exp = {lswitch_name: ['row3', 'row6']}
acl_adds_exp = {lswitch_name:
[{'priority': 1002, 'direction': 'to-lport',
'match': 'ip6 && (ip.src == %s)' %
(port2['fixed_ips'][1]['ip_address'])},
{'priority': 1002, 'direction': 'to-lport',
'match': 'ip6 && (ip.src == %s)' %
(port1['fixed_ips'][1]['ip_address'])}]}
self.assertEqual(acl_del_exp, acl_dels)
self.assertEqual(acl_adds_exp, acl_adds)
# make sure argument add_acl=False will take no affect in
# need_compare=True scenario
update_cmd_with_acl = cmd.UpdateACLsCommand(self.driver._nb_ovn,
[lswitch_name],
iter(ports),
acls_new_dict_copy,
need_compare=True,
is_add_acl=False)
new_acl_dels, new_acl_adds =\
update_cmd_with_acl._compute_acl_differences(iter(ports),
acls_old_dict,
acls_new_dict_copy,
acl_obj_dict)
self.assertEqual(acl_dels, new_acl_dels)
self.assertEqual(acl_adds, new_acl_adds)
def test__get_update_data_without_compare(self):
lswitch_name = 'lswitch-1'
port1 = {'id': 'port-id1',
'network_id': lswitch_name,
'fixed_ips': mock.Mock()}
port2 = {'id': 'port-id2',
'network_id': lswitch_name,
'fixed_ips': mock.Mock()}
ports = [port1, port2]
aclport1_new = {'priority': 1002, 'direction': 'to-lport',
'match': 'outport == %s && ip4 && icmp4' %
(port1['id']), 'external_ids': {}}
aclport2_new = {'priority': 1002, 'direction': 'to-lport',
'match': 'outport == %s && ip4 && icmp4' %
(port2['id']), 'external_ids': {}}
acls_new_dict = {'%s' % (port1['id']): aclport1_new,
'%s' % (port2['id']): aclport2_new}
# test for creating new acls
update_cmd_add_acl = cmd.UpdateACLsCommand(self.driver._nb_ovn,
[lswitch_name],
iter(ports),
acls_new_dict,
need_compare=False,
is_add_acl=True)
lswitch_dict, acl_del_dict, acl_add_dict = \
update_cmd_add_acl._get_update_data_without_compare()
self.assertIn('neutron-lswitch-1', lswitch_dict)
self.assertEqual({}, acl_del_dict)
expected_acls = {'neutron-lswitch-1': [aclport1_new, aclport2_new]}
self.assertEqual(expected_acls, acl_add_dict)
# test for deleting existing acls
acl1 = mock.Mock(
match='outport == port-id1 && ip4 && icmp4', external_ids={})
acl2 = mock.Mock(
match='outport == port-id2 && ip4 && icmp4', external_ids={})
acl3 = mock.Mock(
match='outport == port-id1 && ip4 && (ip4.src == fake_ip)',
external_ids={})
lswitch_obj = mock.Mock(
name='neutron-lswitch-1', acls=[acl1, acl2, acl3])
with mock.patch('ovsdbapp.backend.ovs_idl.idlutils.row_by_value',
return_value=lswitch_obj):
update_cmd_del_acl = cmd.UpdateACLsCommand(self.driver._nb_ovn,
[lswitch_name],
iter(ports),
acls_new_dict,
need_compare=False,
is_add_acl=False)
lswitch_dict, acl_del_dict, acl_add_dict = \
update_cmd_del_acl._get_update_data_without_compare()
self.assertIn('neutron-lswitch-1', lswitch_dict)
expected_acls = {'neutron-lswitch-1': [acl1, acl2]}
self.assertEqual(expected_acls, acl_del_dict)
self.assertEqual({}, acl_add_dict)
def test_acl_protocol_and_ports_for_tcp_udp_and_sctp_number(self):
sg_rule = {'port_range_min': None,
'port_range_max': None}

View File

@ -16,10 +16,8 @@ from unittest import mock
from ovsdbapp.backend.ovs_idl import idlutils
from neutron.common.ovn import acl as ovn_acl
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import exceptions as ovn_exc
from neutron.common.ovn import utils as ovn_utils
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import commands
from neutron.tests import base
from neutron.tests.unit import fake_resources as fakes
@ -744,117 +742,6 @@ class TestDelACLCommand(TestBaseCommand):
fake_lswitch.delvalue.assert_called_once_with('acls', mock.ANY)
class TestUpdateACLsCommand(TestBaseCommand):
def test_lswitch_no_exist(self):
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row()
self.ovn_api.get_acls_for_lswitches.return_value = ({}, {}, {})
cmd = commands.UpdateACLsCommand(
self.ovn_api, [fake_lswitch.name], port_list=[],
acl_new_values_dict={},
need_compare=True)
cmd.run_idl(self.transaction)
self.transaction.insert.assert_not_called()
fake_lswitch.addvalue.assert_not_called()
fake_lswitch.delvalue.assert_not_called()
def _test_acl_update_no_acls(self, need_compare):
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row()
self.ovn_api.get_acls_for_lswitches.return_value = (
{}, {}, {fake_lswitch.name: fake_lswitch})
with mock.patch.object(idlutils, 'row_by_value',
return_value=fake_lswitch):
cmd = commands.UpdateACLsCommand(
self.ovn_api, [fake_lswitch.name], port_list=[],
acl_new_values_dict={},
need_compare=need_compare)
cmd.run_idl(self.transaction)
self.transaction.insert.assert_not_called()
fake_lswitch.addvalue.assert_not_called()
fake_lswitch.delvalue.assert_not_called()
def test_acl_update_compare_no_acls(self):
self._test_acl_update_no_acls(need_compare=True)
def test_acl_update_no_compare_no_acls(self):
self._test_acl_update_no_acls(need_compare=False)
def test_acl_update_compare_acls(self):
fake_sg_rule = \
fakes.FakeSecurityGroupRule.create_one_security_group_rule().info()
fake_port = fakes.FakePort.create_one_port().info()
fake_add_acl = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'match': 'add_acl'})
fake_del_acl = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'match': 'del_acl'})
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'name': ovn_utils.ovn_name(fake_port['network_id']),
'acls': []})
add_acl = ovn_acl.add_sg_rule_acl_for_port(
fake_port, fake_sg_rule, 'add_acl')
self.ovn_api.get_acls_for_lswitches.return_value = (
{fake_port['id']: [fake_del_acl.match]},
{fake_del_acl.match: fake_del_acl},
{fake_lswitch.name.replace('neutron-', ''): fake_lswitch})
cmd = commands.UpdateACLsCommand(
self.ovn_api, [fake_port['network_id']],
[fake_port], {fake_port['id']: [add_acl]},
need_compare=True)
self.transaction.insert.return_value = fake_add_acl
cmd.run_idl(self.transaction)
self.transaction.insert.assert_called_once_with(
self.ovn_api._tables['ACL'])
fake_lswitch.addvalue.assert_called_with('acls', fake_add_acl.uuid)
def test_acl_update_no_compare_add_acls(self):
fake_sg_rule = \
fakes.FakeSecurityGroupRule.create_one_security_group_rule().info()
fake_port = fakes.FakePort.create_one_port().info()
fake_acl = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'match': '*'})
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'name': ovn_utils.ovn_name(fake_port['network_id'])})
add_acl = ovn_acl.add_sg_rule_acl_for_port(
fake_port, fake_sg_rule, '*')
with mock.patch.object(idlutils, 'row_by_value',
return_value=fake_lswitch):
self.transaction.insert.return_value = fake_acl
cmd = commands.UpdateACLsCommand(
self.ovn_api, [fake_port['network_id']],
[fake_port], {fake_port['id']: add_acl},
need_compare=False,
is_add_acl=True)
cmd.run_idl(self.transaction)
self.transaction.insert.assert_called_once_with(
self.ovn_api._tables['ACL'])
fake_lswitch.addvalue.assert_called_once_with(
'acls', fake_acl.uuid)
def test_acl_update_no_compare_del_acls(self):
fake_sg_rule = \
fakes.FakeSecurityGroupRule.create_one_security_group_rule().info()
fake_port = fakes.FakePort.create_one_port().info()
fake_acl = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'match': '*', 'external_ids':
{'neutron:lport': fake_port['id'],
'neutron:security_group_rule_id': fake_sg_rule['id']}})
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'name': ovn_utils.ovn_name(fake_port['network_id']),
'acls': [fake_acl]})
del_acl = ovn_acl.add_sg_rule_acl_for_port(
fake_port, fake_sg_rule, '*')
with mock.patch.object(idlutils, 'row_by_value',
return_value=fake_lswitch):
cmd = commands.UpdateACLsCommand(
self.ovn_api, [fake_port['network_id']],
[fake_port], {fake_port['id']: del_acl},
need_compare=False,
is_add_acl=False)
cmd.run_idl(self.transaction)
self.transaction.insert.assert_not_called()
fake_lswitch.delvalue.assert_called_with('acls', mock.ANY)
class TestAddStaticRouteCommand(TestBaseCommand):
def test_lrouter_not_found(self):