Merge "[OVS][FW] Initialize ConjIdMap._max_id depending on the current OFs" into stable/train

This commit is contained in:
Zuul 2022-04-21 11:46:25 +00:00 committed by Gerrit Code Review
commit dced5f1792
4 changed files with 134 additions and 11 deletions

View File

@ -16,6 +16,8 @@
import collections
import contextlib
import copy
import itertools
import re
import eventlet
import netaddr
@ -41,6 +43,7 @@ from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants \
as ovs_consts
LOG = logging.getLogger(__name__)
CONJ_ID_REGEX = re.compile(r"conj_id=(\d+),")
def _replace_register(flow_params, register_number, register_value):
@ -262,33 +265,75 @@ class SGPortMap(object):
class ConjIdMap(object):
"""Handle conjunction ID allocations and deallocations."""
def __new__(cls):
CONJ_ID_BLOCK_SIZE = 8
MAX_CONJ_ID = 2 ** 32 - 8
def __new__(cls, int_br):
if not hasattr(cls, '_instance'):
cls._instance = super(ConjIdMap, cls).__new__(cls)
return cls._instance
def __init__(self):
def __init__(self, int_br):
self.id_map = collections.defaultdict(self._conj_id_factory)
# Stores the set of conjuntion IDs used for each unique tuple
# (sg_id, remote_sg_id, direction, ethertype). Each tuple can have up
# to 8 conjuntion IDs (see ConjIPFlowManager.add()).
self.id_map_group = collections.defaultdict(set)
self.id_free = collections.deque()
self.max_id = 0
self._max_id = self._init_max_id(int_br)
def _init_max_id(self, int_br):
"""Read the maximum conjunction ID number in the integration bridge
This method will dump all integration bridge flows, parse them and
return the maximum conjunction ID number. By default, "int_br" is a
``OVSAgentBridge`` instance, using "os-ken" library to access to the OF
rules.
If not, "int_br" will default to a ``OVSBridge`` instance. The CLI
command "ovs-ofctl" will be used instead.
:param int_br: ``OVSAgentBridge`` or ``OVSBridge`` instance.
:returns: The maximum conjunction ID number in the integration bridge
"""
conj_id_max = 0
try:
for flow in itertools.chain(
*[int_br.dump_flows(table)
for table in ovs_consts.OVS_FIREWALL_TABLES]):
conj_id_max = max(conj_id_max, flow.match.get('conj_id', 0))
except AttributeError: # br_int is a ``OVSBridge`` instance.
flows_iter = itertools.chain(
*[int_br.dump_flows_for_table(table)
for table in ovs_consts.OVS_FIREWALL_TABLES])
conj_ids = CONJ_ID_REGEX.findall(" | ".join(flows_iter))
try:
conj_id_max = max([int(conj_id) for conj_id in conj_ids])
except ValueError:
conj_id_max = 0
max_id = conj_id_max - conj_id_max % self.CONJ_ID_BLOCK_SIZE
return self._next_max_id(max_id)
def _next_max_id(self, max_id):
max_id += self.CONJ_ID_BLOCK_SIZE
if max_id >= self.MAX_CONJ_ID:
max_id = 0
return max_id
def _conj_id_factory(self):
# If there is any freed ID, use one.
if self.id_free:
return self.id_free.popleft()
# Allocate new one. It must be divisible by 8. (See the next function.)
self.max_id += 8
return self.max_id
self._max_id = self._next_max_id(self._max_id)
return self._max_id
def get_conj_id(self, sg_id, remote_sg_id, direction, ethertype):
"""Return a conjunction ID specified by the arguments.
Allocate one if necessary. The returned ID is divisible by 8,
as there are 4 priority levels (see rules.flow_priority_offset)
and 2 conjunction IDs are needed per priority.
Allocate one if necessary. The returned ID is divisible by 8
(CONJ_ID_BLOCK_SIZE), as there are 4 priority levels
(see rules.flow_priority_offset) and 2 conjunction IDs are needed per
priority.
"""
if direction not in [lib_const.EGRESS_DIRECTION,
lib_const.INGRESS_DIRECTION]:
@ -337,7 +382,7 @@ class ConjIPFlowManager(object):
"""
def __init__(self, driver):
self.conj_id_map = ConjIdMap()
self.conj_id_map = ConjIdMap(driver.int_br.br)
self.driver = driver
# The following two are dict of dicts and are indexed like:
# self.x[vlan_tag][(direction, ethertype)]

View File

@ -144,7 +144,7 @@ class OVSFirewallLoggingDriver(log_ext.LoggingDriver):
self.log_ports = collections.defaultdict(dict)
self.cookies_table = set()
self.cookie_ids_to_delete = set()
self.conj_id_map = ovsfw.ConjIdMap()
self.conj_id_map = ovsfw.ConjIdMap(self.int_br.br)
def initialize(self, resource_rpc, **kwargs):
self.resource_rpc = resource_rpc

View File

@ -18,6 +18,7 @@ from neutron_lib.callbacks import registry as callbacks_registry
from neutron_lib.callbacks import resources as callbacks_resources
from neutron_lib import constants
from neutron_lib.utils import helpers
from os_ken.ofproto import ofproto_v1_3_parser
from oslo_config import cfg
import testtools
@ -36,6 +37,36 @@ from neutron.tests import base
TESTING_VLAN_TAG = 1
TESTING_SEGMENT = 1000
MATCH_1 = ofproto_v1_3_parser.OFPMatch(
_ordered_fields=[('conj_id', 100), ('eth_type', 2048), ('reg5', 12),
('ct_state', (2, 14))])
MATCH_2 = ofproto_v1_3_parser.OFPMatch(
_ordered_fields=[('conj_id', 200), ('eth_type', 2048), ('reg5', 12),
('ct_state', (2, 14))])
MATCH_3 = ofproto_v1_3_parser.OFPMatch(
_ordered_fields=[('reg5', 13), ('ct_state', (10, 14)), ('ct_zone', 1),
('ct_mark', 0)])
MATCH_4 = ofproto_v1_3_parser.OFPMatch(
_ordered_fields=[('eth_type', 34525), ('ip_proto', 58),
('icmpv6_type', 136), ('reg5', 11)])
INIT_OF_RULES = [
mock.Mock(match=MATCH_1),
mock.Mock(match=MATCH_2),
mock.Mock(match=MATCH_3),
mock.Mock(match=MATCH_4),
]
INIT_OF_RULES_VSCTL = [
'priority=40,ct_state=+est,ip,reg5=0xd actions=ct(commit,zone=NXM_NX_REG6['
'0..15],exec(load:0x1->NXM_NX_CT_MARK[]))',
'priority=70,conj_id=100,ct_state=+est-rel-rpl,ip,reg5=0xc actions=load:0x'
'f0->NXM_NX_REG7[],output:12',
'priority=70,conj_id=200,ct_state=+est-rel-rpl,ipv6,reg5=0xc actions=load:'
'0xf8->NXM_NX_REG7[],output:12',
'priority=73,ct_state=+est-rel-rpl,ip,reg6=0x1,nw_src=10.10.0.42 actions=c'
'onjunction(118,1/2)',
'priority=70,ct_state=+est-rel-rpl,ip,reg6=0x1,nw_src=10.10.0.61 actions=c'
'onjunction(120,1/2)'
]
def create_ofport(port_dict, network_type=None,
@ -268,7 +299,11 @@ class TestSGPortMap(base.BaseTestCase):
class TestConjIdMap(base.BaseTestCase):
def setUp(self):
super(TestConjIdMap, self).setUp()
self.conj_id_map = ovsfw.ConjIdMap()
self.mock_int_br = mock.Mock()
self.dump_flows_ret = [[]] * len(ovs_consts.OVS_FIREWALL_TABLES)
self.dump_flows_ret[0] = INIT_OF_RULES
self.mock_int_br.dump_flows.side_effect = self.dump_flows_ret
self.conj_id_map = ovsfw.ConjIdMap(self.mock_int_br)
def test_get_conj_id(self):
allocated = []
@ -289,6 +324,7 @@ class TestConjIdMap(base.BaseTestCase):
constants.IPv6)
def test_delete_sg(self):
self.conj_id_map._max_id = 0
test_data = [
# conj_id: 8
('sg1', 'sg1', constants.INGRESS_DIRECTION, constants.IPv6, 0),
@ -334,11 +370,43 @@ class TestConjIdMap(base.BaseTestCase):
reallocated.add(self.conj_id_map.get_conj_id(*conj_id_tuple))
self.assertEqual(reallocated, conj_id_segment)
def test__init_max_id_os_ken(self):
self.mock_int_br.dump_flows.side_effect = self.dump_flows_ret
self.assertEqual(208, self.conj_id_map._init_max_id(self.mock_int_br))
match = ofproto_v1_3_parser.OFPMatch(
_ordered_fields=[('conj_id', 237), ('eth_type', 2048),
('reg5', 12), ('ct_state', (2, 14))])
new_rule = mock.Mock(match=match)
self.dump_flows_ret[0] = INIT_OF_RULES + [new_rule]
self.mock_int_br.dump_flows.side_effect = self.dump_flows_ret
self.assertEqual(240, self.conj_id_map._init_max_id(self.mock_int_br))
def test__init_max_id_vsctl(self):
self.mock_int_br.dump_flows.side_effect = AttributeError()
dump_flows_ret = [[]] * len(ovs_consts.OVS_FIREWALL_TABLES)
dump_flows_ret[0] = INIT_OF_RULES_VSCTL
self.mock_int_br.dump_flows_for_table.side_effect = dump_flows_ret
self.assertEqual(208, self.conj_id_map._init_max_id(self.mock_int_br))
new_rule = ('priority=70,conj_id=237,ct_state=+est-rel-rpl,ipv6,reg5=0'
'xc actions=load:0xf8->NXM_NX_REG7[],output:12')
dump_flows_ret[0] = INIT_OF_RULES_VSCTL + [new_rule]
self.mock_int_br.dump_flows_for_table.side_effect = dump_flows_ret
self.assertEqual(240, self.conj_id_map._init_max_id(self.mock_int_br))
def test__next_max_id(self):
self.assertEqual(8, self.conj_id_map._next_max_id(0))
self.assertEqual(0, self.conj_id_map._next_max_id(
self.conj_id_map.MAX_CONJ_ID - 1))
class TestConjIPFlowManager(base.BaseTestCase):
def setUp(self):
super(TestConjIPFlowManager, self).setUp()
self.driver = mock.Mock()
self.driver.int_br.br.dump_flows.return_value = INIT_OF_RULES
self.manager = ovsfw.ConjIPFlowManager(self.driver)
self.vlan_tag = 100
self.conj_id = 16
@ -1130,6 +1198,8 @@ class TestCookieContext(base.BaseTestCase):
ovsfw.OVSFirewallDriver, 'initialize_bridge',
return_value=bridge.deferred(
full_ordered=True, use_bundle=True)).start()
mock.patch.object(ovsfw.ConjIdMap, '_init_max_id',
return_value=0).start()
securitygroups_rpc.register_securitygroups_opts()
self.firewall = ovsfw.OVSFirewallDriver(bridge)

View File

@ -101,6 +101,12 @@ class FakeOVSPort(object):
class TestOVSFirewallLoggingDriver(base.BaseTestCase):
def setUp(self):
super(TestOVSFirewallLoggingDriver, self).setUp()
mock_int_br = mock.Mock()
mock_int_br.br.dump_flows.return_value = []
self._mock_initialize_bridge = mock.patch.object(
ovsfw_log.OVSFirewallLoggingDriver, 'initialize_bridge',
return_value=mock_int_br)
self.mock_initialize_bridge = self._mock_initialize_bridge.start()
self.log_driver = ovsfw_log.OVSFirewallLoggingDriver(mock.Mock())
resource_rpc_mock = mock.patch.object(
agent_rpc, 'LoggingApiStub', autospec=True).start()
@ -131,10 +137,12 @@ class TestOVSFirewallLoggingDriver(base.BaseTestCase):
return self.mock_bridge.br.get_vif_port_by_id.return_value.vif_mac
def test_initialize_bridge(self):
self._mock_initialize_bridge.stop()
br = self.log_driver.initialize_bridge(self.mock_bridge)
self.assertEqual(self.mock_bridge.deferred.return_value, br)
def test_set_controller_rate_limit(self):
self._mock_initialize_bridge.stop()
set_log_driver_config(100, 25)
self.log_driver.initialize_bridge(self.mock_bridge)
expected_calls = [mock.call.set_controller_rate_limit(100),