[OVN] Move OVN ACL commands to Neutron tree

Once networking-ovn.common.acl code is in Neutron tree, the ACL
commands can be implemented.

Previous paths in networking-ovn tree:
./networking_ovn/ovsdb/commands.py ->
  ./neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/commands.py

Co-Authored-By: Russell Bryant <rbryant@redhat.com>

Change-Id: Ic0cb48887901d16ad0304be6ad55dca1df4fadd0
Partially-Implements: blueprint neutron-ovn-merge
This commit is contained in:
Rodolfo Alonso Hernandez 2019-11-29 13:42:18 +00:00
parent b8cf079b76
commit f77c1037e4
3 changed files with 561 additions and 1 deletions

View File

@ -441,6 +441,213 @@ class SetLRouterPortInLSwitchPortCommand(command.BaseCommand):
setattr(port, 'addresses', self.lsp_address)
class AddACLCommand(command.BaseCommand):
def __init__(self, api, lswitch, lport, **columns):
super(AddACLCommand, self).__init__(api)
self.lswitch = lswitch
self.lport = lport
self.columns = columns
def run_idl(self, txn):
try:
lswitch = idlutils.row_by_value(self.api.idl, 'Logical_Switch',
'name', self.lswitch)
except idlutils.RowNotFound:
msg = _("Logical Switch %s does not exist") % self.lswitch
raise RuntimeError(msg)
row = txn.insert(self.api._tables['ACL'])
for col, val in self.columns.items():
setattr(row, col, val)
_addvalue_to_list(lswitch, 'acls', row.uuid)
class DelACLCommand(command.BaseCommand):
def __init__(self, api, lswitch, lport, if_exists):
super(DelACLCommand, self).__init__(api)
self.lswitch = lswitch
self.lport = lport
self.if_exists = if_exists
def run_idl(self, txn):
try:
lswitch = idlutils.row_by_value(self.api.idl, 'Logical_Switch',
'name', self.lswitch)
except idlutils.RowNotFound:
if self.if_exists:
return
msg = _("Logical Switch %s does not exist") % self.lswitch
raise RuntimeError(msg)
acls_to_del = []
acls = getattr(lswitch, 'acls', [])
for acl in acls:
ext_ids = getattr(acl, 'external_ids', {})
if ext_ids.get('neutron:lport') == self.lport:
acls_to_del.append(acl)
for acl in acls_to_del:
acl.delete()
_updatevalues_in_list(lswitch, 'acls', old_values=acls_to_del)
class UpdateACLsCommand(command.BaseCommand):
def __init__(self, api, lswitch_names, port_list, acl_new_values_dict,
need_compare=True, is_add_acl=True):
"""This command updates the acl list for the logical switches
@param lswitch_names: List of Logical Switch Names
@type lswitch_names: []
@param port_list: Iterator of List of Ports
@type port_list: []
@param acl_new_values_dict: Dictionary of acls indexed by port id
@type acl_new_values_dict: {}
@need_compare: If acl_new_values_dict needs be compared with existing
acls.
@type: Boolean.
@is_add_acl: If updating is caused by acl adding action.
@type: Boolean.
"""
super(UpdateACLsCommand, self).__init__(api)
self.lswitch_names = lswitch_names
self.port_list = port_list
self.acl_new_values_dict = acl_new_values_dict
self.need_compare = need_compare
self.is_add_acl = is_add_acl
def _acl_list_sub(self, acl_list1, acl_list2):
"""Compute the elements in acl_list1 but not in acl_list2.
If acl_list1 and acl_list2 were sets, the result of this routine
could be thought of as acl_list1 - acl_list2. Note that acl_list1
and acl_list2 cannot actually be sets as they contain dictionary
items i.e. set([{'a':1}) doesn't work.
"""
acl_diff = []
for acl in acl_list1:
if acl not in acl_list2:
acl_diff.append(acl)
return acl_diff
def _compute_acl_differences(self, port_list, acl_old_values_dict,
acl_new_values_dict, acl_obj_dict):
"""Compute the difference between the new and old sets of acls
@param port_list: Iterator of a List of ports
@type port_list: []
@param acl_old_values_dict: Dictionary of old acl values indexed
by port id
@param acl_new_values_dict: Dictionary of new acl values indexed
by port id
@param acl_obj_dict: Dictionary of acl objects indexed by the acl
value in string format.
@var acl_del_objs_dict: Dictionary of acl objects to be deleted
indexed by the lswitch.
@var acl_add_values_dict: Dictionary of acl values to be added
indexed by the lswitch.
@return: (acl_del_objs_dict, acl_add_values_dict)
@rtype: ({}, {})
"""
acl_del_objs_dict = {}
acl_add_values_dict = {}
for port in port_list:
lswitch_name = port['network_id']
acls_old = acl_old_values_dict.get(port['id'], [])
acls_new = acl_new_values_dict.get(port['id'], [])
acls_del = self._acl_list_sub(acls_old, acls_new)
acls_add = self._acl_list_sub(acls_new, acls_old)
acl_del_objs = acl_del_objs_dict.setdefault(lswitch_name, [])
for acl in acls_del:
acl_del_objs.append(acl_obj_dict[str(acl)])
acl_add_values = acl_add_values_dict.setdefault(lswitch_name, [])
for acl in acls_add:
# Remove lport and lswitch columns
del acl['lswitch']
del acl['lport']
acl_add_values.append(acl)
return acl_del_objs_dict, acl_add_values_dict
def _get_update_data_without_compare(self):
lswitch_ovsdb_dict = {}
for switch_name in self.lswitch_names:
switch_name = utils.ovn_name(switch_name)
lswitch = idlutils.row_by_value(self.api.idl, 'Logical_Switch',
'name', switch_name)
lswitch_ovsdb_dict[switch_name] = lswitch
if self.is_add_acl:
acl_add_values_dict = {}
for port in self.port_list:
switch_name = utils.ovn_name(port['network_id'])
if switch_name not in acl_add_values_dict:
acl_add_values_dict[switch_name] = []
if port['id'] in self.acl_new_values_dict:
acl_add_values_dict[switch_name].append(
self.acl_new_values_dict[port['id']])
acl_del_objs_dict = {}
else:
acl_add_values_dict = {}
acl_del_objs_dict = {}
del_acl_extids = []
for acl_dict in self.acl_new_values_dict.values():
del_acl_extids.append({acl_dict['match']:
acl_dict['external_ids']})
for switch_name, lswitch in lswitch_ovsdb_dict.items():
if switch_name not in acl_del_objs_dict:
acl_del_objs_dict[switch_name] = []
acls = getattr(lswitch, 'acls', [])
for acl in acls:
match = getattr(acl, 'match')
acl_extids = {match: getattr(acl, 'external_ids')}
if acl_extids in del_acl_extids:
acl_del_objs_dict[switch_name].append(acl)
return lswitch_ovsdb_dict, acl_del_objs_dict, acl_add_values_dict
def run_idl(self, txn):
if self.need_compare:
# Get all relevant ACLs in 1 shot
acl_values_dict, acl_obj_dict, lswitch_ovsdb_dict = (
self.api.get_acls_for_lswitches(self.lswitch_names))
# Compute the difference between the new and old set of ACLs
acl_del_objs_dict, acl_add_values_dict = (
self._compute_acl_differences(
self.port_list, acl_values_dict,
self.acl_new_values_dict, acl_obj_dict))
else:
lswitch_ovsdb_dict, acl_del_objs_dict, acl_add_values_dict = (
self._get_update_data_without_compare())
for lswitch_name, lswitch in lswitch_ovsdb_dict.items():
acl_del_objs = acl_del_objs_dict.get(lswitch_name, [])
acl_add_values = acl_add_values_dict.get(lswitch_name, [])
# Continue if no ACLs to add or delete.
if not acl_del_objs and not acl_add_values:
continue
# Delete old ACLs.
if acl_del_objs:
for acl_del_obj in acl_del_objs:
acl_del_obj.delete()
# Add new ACLs.
acl_add_objs = None
if acl_add_values:
acl_add_objs = []
for acl_value in acl_add_values:
row = txn.insert(self.api._tables['ACL'])
for col, val in acl_value.items():
setattr(row, col, val)
acl_add_objs.append(row.uuid)
# Update logical switch ACLs.
_updatevalues_in_list(lswitch, 'acls',
new_values=acl_add_objs,
old_values=acl_del_objs)
class AddStaticRouteCommand(command.BaseCommand):
def __init__(self, api, lrouter, **columns):
super(AddStaticRouteCommand, self).__init__(api)

View File

@ -10,7 +10,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import mock
from neutron_lib import constants as const
@ -21,6 +22,7 @@ from neutron.common.ovn import acl as ovn_acl
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils as ovn_utils
from neutron.conf.agent import securitygroups_rpc
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import commands as cmd
from neutron.tests import base
from neutron.tests.unit import fake_resources as fakes
@ -168,6 +170,184 @@ class TestACLs(base.BaseTestCase):
'from-lport',
match)
def test__update_acls_compute_difference(self):
lswitch_name = 'lswitch-1'
port1 = {'id': 'port-id1',
'network_id': lswitch_name,
'fixed_ips': [{'subnet_id': 'subnet-id',
'ip_address': '1.1.1.101'},
{'subnet_id': 'subnet-id-v6',
'ip_address': '2001:0db8::1:0:0:1'}]}
port2 = {'id': 'port-id2',
'network_id': lswitch_name,
'fixed_ips': [{'subnet_id': 'subnet-id',
'ip_address': '1.1.1.102'},
{'subnet_id': 'subnet-id-v6',
'ip_address': '2001:0db8::1:0:0:2'}]}
ports = [port1, port2]
# OLD ACLs, allow IPv4 communication
aclport1_old1 = {'priority': 1002, 'direction': 'from-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip4 && (ip.src == %s)' %
(port1['id'], port1['fixed_ips'][0]['ip_address'])}
aclport1_old2 = {'priority': 1002, 'direction': 'from-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip6 && (ip.src == %s)' %
(port1['id'], port1['fixed_ips'][1]['ip_address'])}
aclport1_old3 = {'priority': 1002, 'direction': 'to-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'ip4 && (ip.src == %s)' %
(port2['fixed_ips'][0]['ip_address'])}
port1_acls_old = [aclport1_old1, aclport1_old2, aclport1_old3]
aclport2_old1 = {'priority': 1002, 'direction': 'from-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip4 && (ip.src == %s)' %
(port2['id'], port2['fixed_ips'][0]['ip_address'])}
aclport2_old2 = {'priority': 1002, 'direction': 'from-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip6 && (ip.src == %s)' %
(port2['id'], port2['fixed_ips'][1]['ip_address'])}
aclport2_old3 = {'priority': 1002, 'direction': 'to-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'ip4 && (ip.src == %s)' %
(port1['fixed_ips'][0]['ip_address'])}
port2_acls_old = [aclport2_old1, aclport2_old2, aclport2_old3]
acls_old_dict = {'%s' % (port1['id']): port1_acls_old,
'%s' % (port2['id']): port2_acls_old}
acl_obj_dict = {str(aclport1_old1): 'row1',
str(aclport1_old2): 'row2',
str(aclport1_old3): 'row3',
str(aclport2_old1): 'row4',
str(aclport2_old2): 'row5',
str(aclport2_old3): 'row6'}
# NEW ACLs, allow IPv6 communication
aclport1_new1 = {'priority': 1002, 'direction': 'from-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip4 && (ip.src == %s)' %
(port1['id'], port1['fixed_ips'][0]['ip_address'])}
aclport1_new2 = {'priority': 1002, 'direction': 'from-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip6 && (ip.src == %s)' %
(port1['id'], port1['fixed_ips'][1]['ip_address'])}
aclport1_new3 = {'priority': 1002, 'direction': 'to-lport',
'lport': port1['id'], 'lswitch': lswitch_name,
'match': 'ip6 && (ip.src == %s)' %
(port2['fixed_ips'][1]['ip_address'])}
port1_acls_new = [aclport1_new1, aclport1_new2, aclport1_new3]
aclport2_new1 = {'priority': 1002, 'direction': 'from-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip4 && (ip.src == %s)' %
(port2['id'], port2['fixed_ips'][0]['ip_address'])}
aclport2_new2 = {'priority': 1002, 'direction': 'from-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'inport == %s && ip6 && (ip.src == %s)' %
(port2['id'], port2['fixed_ips'][1]['ip_address'])}
aclport2_new3 = {'priority': 1002, 'direction': 'to-lport',
'lport': port2['id'], 'lswitch': lswitch_name,
'match': 'ip6 && (ip.src == %s)' %
(port1['fixed_ips'][1]['ip_address'])}
port2_acls_new = [aclport2_new1, aclport2_new2, aclport2_new3]
acls_new_dict = {'%s' % (port1['id']): port1_acls_new,
'%s' % (port2['id']): port2_acls_new}
acls_new_dict_copy = copy.deepcopy(acls_new_dict)
# Invoke _compute_acl_differences
update_cmd = cmd.UpdateACLsCommand(self.driver._nb_ovn,
[lswitch_name],
iter(ports),
acls_new_dict
)
acl_dels, acl_adds =\
update_cmd._compute_acl_differences(iter(ports),
acls_old_dict,
acls_new_dict,
acl_obj_dict)
# Expected Difference (Sorted)
acl_del_exp = {lswitch_name: ['row3', 'row6']}
acl_adds_exp = {lswitch_name:
[{'priority': 1002, 'direction': 'to-lport',
'match': 'ip6 && (ip.src == %s)' %
(port2['fixed_ips'][1]['ip_address'])},
{'priority': 1002, 'direction': 'to-lport',
'match': 'ip6 && (ip.src == %s)' %
(port1['fixed_ips'][1]['ip_address'])}]}
self.assertEqual(acl_del_exp, acl_dels)
self.assertEqual(acl_adds_exp, acl_adds)
# make sure argument add_acl=False will take no affect in
# need_compare=True scenario
update_cmd_with_acl = cmd.UpdateACLsCommand(self.driver._nb_ovn,
[lswitch_name],
iter(ports),
acls_new_dict_copy,
need_compare=True,
is_add_acl=False)
new_acl_dels, new_acl_adds =\
update_cmd_with_acl._compute_acl_differences(iter(ports),
acls_old_dict,
acls_new_dict_copy,
acl_obj_dict)
self.assertEqual(acl_dels, new_acl_dels)
self.assertEqual(acl_adds, new_acl_adds)
def test__get_update_data_without_compare(self):
lswitch_name = 'lswitch-1'
port1 = {'id': 'port-id1',
'network_id': lswitch_name,
'fixed_ips': mock.Mock()}
port2 = {'id': 'port-id2',
'network_id': lswitch_name,
'fixed_ips': mock.Mock()}
ports = [port1, port2]
aclport1_new = {'priority': 1002, 'direction': 'to-lport',
'match': 'outport == %s && ip4 && icmp4' %
(port1['id']), 'external_ids': {}}
aclport2_new = {'priority': 1002, 'direction': 'to-lport',
'match': 'outport == %s && ip4 && icmp4' %
(port2['id']), 'external_ids': {}}
acls_new_dict = {'%s' % (port1['id']): aclport1_new,
'%s' % (port2['id']): aclport2_new}
# test for creating new acls
update_cmd_add_acl = cmd.UpdateACLsCommand(self.driver._nb_ovn,
[lswitch_name],
iter(ports),
acls_new_dict,
need_compare=False,
is_add_acl=True)
lswitch_dict, acl_del_dict, acl_add_dict = \
update_cmd_add_acl._get_update_data_without_compare()
self.assertIn('neutron-lswitch-1', lswitch_dict)
self.assertEqual({}, acl_del_dict)
expected_acls = {'neutron-lswitch-1': [aclport1_new, aclport2_new]}
self.assertEqual(expected_acls, acl_add_dict)
# test for deleting existing acls
acl1 = mock.Mock(
match='outport == port-id1 && ip4 && icmp4', external_ids={})
acl2 = mock.Mock(
match='outport == port-id2 && ip4 && icmp4', external_ids={})
acl3 = mock.Mock(
match='outport == port-id1 && ip4 && (ip4.src == fake_ip)',
external_ids={})
lswitch_obj = mock.Mock(
name='neutron-lswitch-1', acls=[acl1, acl2, acl3])
with mock.patch('ovsdbapp.backend.ovs_idl.idlutils.row_by_value',
return_value=lswitch_obj):
update_cmd_del_acl = cmd.UpdateACLsCommand(self.driver._nb_ovn,
[lswitch_name],
iter(ports),
acls_new_dict,
need_compare=False,
is_add_acl=False)
lswitch_dict, acl_del_dict, acl_add_dict = \
update_cmd_del_acl._get_update_data_without_compare()
self.assertIn('neutron-lswitch-1', lswitch_dict)
expected_acls = {'neutron-lswitch-1': [acl1, acl2]}
self.assertEqual(expected_acls, acl_del_dict)
self.assertEqual({}, acl_add_dict)
def test_acl_protocol_and_ports_for_tcp_udp_and_sctp_number(self):
sg_rule = {'port_range_min': None,
'port_range_max': None}

View File

@ -15,8 +15,10 @@
import mock
from ovsdbapp.backend.ovs_idl import idlutils
from neutron.common.ovn import acl as ovn_acl
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import exceptions as ovn_exc
from neutron.common.ovn import utils as ovn_utils
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import commands
from neutron.tests import base
from neutron.tests.unit import fake_resources as fakes
@ -718,6 +720,177 @@ class TestSetLRouterPortInLSwitchPortCommand(TestBaseCommand):
self.assertEqual('router', fake_lsp.addresses)
class TestAddACLCommand(TestBaseCommand):
def test_lswitch_no_exist(self, if_exists=True):
with mock.patch.object(idlutils, 'row_by_value',
side_effect=idlutils.RowNotFound):
cmd = commands.AddACLCommand(
self.ovn_api, 'fake-lswitch', 'fake-lsp')
self.assertRaises(RuntimeError, cmd.run_idl, self.transaction)
def test_acl_add(self):
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row()
with mock.patch.object(idlutils, 'row_by_value',
return_value=fake_lswitch):
fake_acl = fakes.FakeOvsdbRow.create_one_ovsdb_row()
self.transaction.insert.return_value = fake_acl
cmd = commands.AddACLCommand(
self.ovn_api, fake_lswitch.name, 'fake-lsp', match='*')
cmd.run_idl(self.transaction)
self.transaction.insert.assert_called_once_with(
self.ovn_api._tables['ACL'])
fake_lswitch.addvalue.assert_called_once_with(
'acls', fake_acl.uuid)
self.assertEqual('*', fake_acl.match)
class TestDelACLCommand(TestBaseCommand):
def _test_lswitch_no_exist(self, if_exists=True):
with mock.patch.object(idlutils, 'row_by_value',
side_effect=idlutils.RowNotFound):
cmd = commands.DelACLCommand(
self.ovn_api, 'fake-lswitch', 'fake-lsp',
if_exists=if_exists)
if if_exists:
cmd.run_idl(self.transaction)
else:
self.assertRaises(RuntimeError, cmd.run_idl, self.transaction)
def test_lswitch_no_exist_ignore(self):
self._test_lswitch_no_exist(if_exists=True)
def test_lswitch_no_exist_fail(self):
self._test_lswitch_no_exist(if_exists=False)
def test_acl_del(self):
fake_lsp_name = 'fake-lsp'
fake_acl_del = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'external_ids': {'neutron:lport': fake_lsp_name}})
fake_acl_save = mock.ANY
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'acls': [fake_acl_del, fake_acl_save]})
with mock.patch.object(idlutils, 'row_by_value',
return_value=fake_lswitch):
cmd = commands.DelACLCommand(
self.ovn_api, fake_lswitch.name, fake_lsp_name,
if_exists=True)
cmd.run_idl(self.transaction)
fake_lswitch.delvalue.assert_called_once_with('acls', mock.ANY)
class TestUpdateACLsCommand(TestBaseCommand):
def test_lswitch_no_exist(self):
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row()
self.ovn_api.get_acls_for_lswitches.return_value = ({}, {}, {})
cmd = commands.UpdateACLsCommand(
self.ovn_api, [fake_lswitch.name], port_list=[],
acl_new_values_dict={},
need_compare=True)
cmd.run_idl(self.transaction)
self.transaction.insert.assert_not_called()
fake_lswitch.addvalue.assert_not_called()
fake_lswitch.delvalue.assert_not_called()
def _test_acl_update_no_acls(self, need_compare):
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row()
self.ovn_api.get_acls_for_lswitches.return_value = (
{}, {}, {fake_lswitch.name: fake_lswitch})
with mock.patch.object(idlutils, 'row_by_value',
return_value=fake_lswitch):
cmd = commands.UpdateACLsCommand(
self.ovn_api, [fake_lswitch.name], port_list=[],
acl_new_values_dict={},
need_compare=need_compare)
cmd.run_idl(self.transaction)
self.transaction.insert.assert_not_called()
fake_lswitch.addvalue.assert_not_called()
fake_lswitch.delvalue.assert_not_called()
def test_acl_update_compare_no_acls(self):
self._test_acl_update_no_acls(need_compare=True)
def test_acl_update_no_compare_no_acls(self):
self._test_acl_update_no_acls(need_compare=False)
def test_acl_update_compare_acls(self):
fake_sg_rule = \
fakes.FakeSecurityGroupRule.create_one_security_group_rule().info()
fake_port = fakes.FakePort.create_one_port().info()
fake_add_acl = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'match': 'add_acl'})
fake_del_acl = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'match': 'del_acl'})
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'name': ovn_utils.ovn_name(fake_port['network_id']),
'acls': []})
add_acl = ovn_acl.add_sg_rule_acl_for_port(
fake_port, fake_sg_rule, 'add_acl')
self.ovn_api.get_acls_for_lswitches.return_value = (
{fake_port['id']: [fake_del_acl.match]},
{fake_del_acl.match: fake_del_acl},
{fake_lswitch.name.replace('neutron-', ''): fake_lswitch})
cmd = commands.UpdateACLsCommand(
self.ovn_api, [fake_port['network_id']],
[fake_port], {fake_port['id']: [add_acl]},
need_compare=True)
self.transaction.insert.return_value = fake_add_acl
cmd.run_idl(self.transaction)
self.transaction.insert.assert_called_once_with(
self.ovn_api._tables['ACL'])
fake_lswitch.addvalue.assert_called_with('acls', fake_add_acl.uuid)
def test_acl_update_no_compare_add_acls(self):
fake_sg_rule = \
fakes.FakeSecurityGroupRule.create_one_security_group_rule().info()
fake_port = fakes.FakePort.create_one_port().info()
fake_acl = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'match': '*'})
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'name': ovn_utils.ovn_name(fake_port['network_id'])})
add_acl = ovn_acl.add_sg_rule_acl_for_port(
fake_port, fake_sg_rule, '*')
with mock.patch.object(idlutils, 'row_by_value',
return_value=fake_lswitch):
self.transaction.insert.return_value = fake_acl
cmd = commands.UpdateACLsCommand(
self.ovn_api, [fake_port['network_id']],
[fake_port], {fake_port['id']: add_acl},
need_compare=False,
is_add_acl=True)
cmd.run_idl(self.transaction)
self.transaction.insert.assert_called_once_with(
self.ovn_api._tables['ACL'])
fake_lswitch.addvalue.assert_called_once_with(
'acls', fake_acl.uuid)
def test_acl_update_no_compare_del_acls(self):
fake_sg_rule = \
fakes.FakeSecurityGroupRule.create_one_security_group_rule().info()
fake_port = fakes.FakePort.create_one_port().info()
fake_acl = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'match': '*', 'external_ids':
{'neutron:lport': fake_port['id'],
'neutron:security_group_rule_id': fake_sg_rule['id']}})
fake_lswitch = fakes.FakeOvsdbRow.create_one_ovsdb_row(
attrs={'name': ovn_utils.ovn_name(fake_port['network_id']),
'acls': [fake_acl]})
del_acl = ovn_acl.add_sg_rule_acl_for_port(
fake_port, fake_sg_rule, '*')
with mock.patch.object(idlutils, 'row_by_value',
return_value=fake_lswitch):
cmd = commands.UpdateACLsCommand(
self.ovn_api, [fake_port['network_id']],
[fake_port], {fake_port['id']: del_acl},
need_compare=False,
is_add_acl=False)
cmd.run_idl(self.transaction)
self.transaction.insert.assert_not_called()
fake_lswitch.delvalue.assert_called_with('acls', mock.ANY)
class TestAddStaticRouteCommand(TestBaseCommand):
def test_lrouter_not_found(self):