diff --git a/neutron/agent/linux/openvswitch_firewall/firewall.py b/neutron/agent/linux/openvswitch_firewall/firewall.py index 5d3ec14a96f..2f1e219c8d3 100644 --- a/neutron/agent/linux/openvswitch_firewall/firewall.py +++ b/neutron/agent/linux/openvswitch_firewall/firewall.py @@ -16,6 +16,8 @@ import collections import contextlib import copy +import itertools +import re import eventlet import netaddr @@ -41,6 +43,7 @@ from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants \ as ovs_consts LOG = logging.getLogger(__name__) +CONJ_ID_REGEX = re.compile(r"conj_id=(\d+),") def _replace_register(flow_params, register_number, register_value): @@ -262,33 +265,75 @@ class SGPortMap(object): class ConjIdMap(object): """Handle conjunction ID allocations and deallocations.""" - def __new__(cls): + CONJ_ID_BLOCK_SIZE = 8 + MAX_CONJ_ID = 2 ** 32 - 8 + + def __new__(cls, int_br): if not hasattr(cls, '_instance'): cls._instance = super(ConjIdMap, cls).__new__(cls) return cls._instance - def __init__(self): + def __init__(self, int_br): self.id_map = collections.defaultdict(self._conj_id_factory) # Stores the set of conjuntion IDs used for each unique tuple # (sg_id, remote_id, direction, ethertype). Each tuple # can have up to 8 conjuntion IDs (see ConjIPFlowManager.add()). self.id_map_group = collections.defaultdict(set) self.id_free = collections.deque() - self.max_id = 0 + self._max_id = self._init_max_id(int_br) + + def _init_max_id(self, int_br): + """Read the maximum conjunction ID number in the integration bridge + + This method will dump all integration bridge flows, parse them and + return the maximum conjunction ID number. By default, "int_br" is a + ``OVSAgentBridge`` instance, using "os-ken" library to access to the OF + rules. + If not, "int_br" will default to a ``OVSBridge`` instance. The CLI + command "ovs-ofctl" will be used instead. + + :param int_br: ``OVSAgentBridge`` or ``OVSBridge`` instance. + :returns: The maximum conjunction ID number in the integration bridge + """ + conj_id_max = 0 + try: + for flow in itertools.chain( + *[int_br.dump_flows(table) + for table in ovs_consts.OVS_FIREWALL_TABLES]): + conj_id_max = max(conj_id_max, flow.match.get('conj_id', 0)) + except AttributeError: # br_int is a ``OVSBridge`` instance. + flows_iter = itertools.chain( + *[int_br.dump_flows_for_table(table) + for table in ovs_consts.OVS_FIREWALL_TABLES]) + conj_ids = CONJ_ID_REGEX.findall(" | ".join(flows_iter)) + try: + conj_id_max = max([int(conj_id) for conj_id in conj_ids]) + except ValueError: + conj_id_max = 0 + + max_id = conj_id_max - conj_id_max % self.CONJ_ID_BLOCK_SIZE + return self._next_max_id(max_id) + + def _next_max_id(self, max_id): + max_id += self.CONJ_ID_BLOCK_SIZE + if max_id >= self.MAX_CONJ_ID: + max_id = 0 + return max_id def _conj_id_factory(self): # If there is any freed ID, use one. if self.id_free: return self.id_free.popleft() # Allocate new one. It must be divisible by 8. (See the next function.) - self.max_id += 8 - return self.max_id + self._max_id = self._next_max_id(self._max_id) + return self._max_id def get_conj_id(self, sg_id, remote_id, direction, ethertype): """Return a conjunction ID specified by the arguments. - Allocate one if necessary. The returned ID is divisible by 8, - as there are 4 priority levels (see rules.flow_priority_offset) - and 2 conjunction IDs are needed per priority. + Allocate one if necessary. The returned ID is divisible by 8 + (CONJ_ID_BLOCK_SIZE), as there are 4 priority levels + (see rules.flow_priority_offset) and 2 conjunction IDs are needed per + priority. """ if direction not in [lib_const.EGRESS_DIRECTION, lib_const.INGRESS_DIRECTION]: @@ -337,7 +382,7 @@ class ConjIPFlowManager(object): """ def __init__(self, driver): - self.conj_id_map = ConjIdMap() + self.conj_id_map = ConjIdMap(driver.int_br.br) self.driver = driver # The following two are dict of dicts and are indexed like: # self.x[vlan_tag][(direction, ethertype)] diff --git a/neutron/services/logapi/drivers/openvswitch/ovs_firewall_log.py b/neutron/services/logapi/drivers/openvswitch/ovs_firewall_log.py index e0dccb79782..e91ee174c40 100644 --- a/neutron/services/logapi/drivers/openvswitch/ovs_firewall_log.py +++ b/neutron/services/logapi/drivers/openvswitch/ovs_firewall_log.py @@ -145,7 +145,7 @@ class OVSFirewallLoggingDriver(log_ext.LoggingDriver): self.log_ports = collections.defaultdict(dict) self.cookies_table = set() self.cookie_ids_to_delete = set() - self.conj_id_map = ovsfw.ConjIdMap() + self.conj_id_map = ovsfw.ConjIdMap(self.int_br.br) def initialize(self, resource_rpc, **kwargs): self.resource_rpc = resource_rpc diff --git a/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py b/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py index f77d6dc6571..742b02c7074 100644 --- a/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py +++ b/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py @@ -19,6 +19,7 @@ from neutron_lib.callbacks import registry as callbacks_registry from neutron_lib.callbacks import resources as callbacks_resources from neutron_lib import constants from neutron_lib.utils import helpers +from os_ken.ofproto import ofproto_v1_3_parser from oslo_config import cfg import testtools @@ -37,6 +38,36 @@ from neutron.tests import base TESTING_VLAN_TAG = 1 TESTING_SEGMENT = 1000 +MATCH_1 = ofproto_v1_3_parser.OFPMatch( + _ordered_fields=[('conj_id', 100), ('eth_type', 2048), ('reg5', 12), + ('ct_state', (2, 14))]) +MATCH_2 = ofproto_v1_3_parser.OFPMatch( + _ordered_fields=[('conj_id', 200), ('eth_type', 2048), ('reg5', 12), + ('ct_state', (2, 14))]) +MATCH_3 = ofproto_v1_3_parser.OFPMatch( + _ordered_fields=[('reg5', 13), ('ct_state', (10, 14)), ('ct_zone', 1), + ('ct_mark', 0)]) +MATCH_4 = ofproto_v1_3_parser.OFPMatch( + _ordered_fields=[('eth_type', 34525), ('ip_proto', 58), + ('icmpv6_type', 136), ('reg5', 11)]) +INIT_OF_RULES = [ + mock.Mock(match=MATCH_1), + mock.Mock(match=MATCH_2), + mock.Mock(match=MATCH_3), + mock.Mock(match=MATCH_4), +] +INIT_OF_RULES_VSCTL = [ + 'priority=40,ct_state=+est,ip,reg5=0xd actions=ct(commit,zone=NXM_NX_REG6[' + '0..15],exec(load:0x1->NXM_NX_CT_MARK[]))', + 'priority=70,conj_id=100,ct_state=+est-rel-rpl,ip,reg5=0xc actions=load:0x' + 'f0->NXM_NX_REG7[],output:12', + 'priority=70,conj_id=200,ct_state=+est-rel-rpl,ipv6,reg5=0xc actions=load:' + '0xf8->NXM_NX_REG7[],output:12', + 'priority=73,ct_state=+est-rel-rpl,ip,reg6=0x1,nw_src=10.10.0.42 actions=c' + 'onjunction(118,1/2)', + 'priority=70,ct_state=+est-rel-rpl,ip,reg6=0x1,nw_src=10.10.0.61 actions=c' + 'onjunction(120,1/2)' +] def create_ofport(port_dict, network_type=None, @@ -272,7 +303,11 @@ class TestSGPortMap(base.BaseTestCase): class TestConjIdMap(base.BaseTestCase): def setUp(self): super(TestConjIdMap, self).setUp() - self.conj_id_map = ovsfw.ConjIdMap() + self.mock_int_br = mock.Mock() + self.dump_flows_ret = [[]] * len(ovs_consts.OVS_FIREWALL_TABLES) + self.dump_flows_ret[0] = INIT_OF_RULES + self.mock_int_br.dump_flows.side_effect = self.dump_flows_ret + self.conj_id_map = ovsfw.ConjIdMap(self.mock_int_br) def test_get_conj_id(self): allocated = [] @@ -293,6 +328,7 @@ class TestConjIdMap(base.BaseTestCase): constants.IPv6) def test_delete_sg(self): + self.conj_id_map._max_id = 0 test_data = [ # conj_id: 8 ('sg1', 'sg1', constants.INGRESS_DIRECTION, constants.IPv6, 0), @@ -338,11 +374,43 @@ class TestConjIdMap(base.BaseTestCase): reallocated.add(self.conj_id_map.get_conj_id(*conj_id_tuple)) self.assertEqual(reallocated, conj_id_segment) + def test__init_max_id_os_ken(self): + self.mock_int_br.dump_flows.side_effect = self.dump_flows_ret + self.assertEqual(208, self.conj_id_map._init_max_id(self.mock_int_br)) + + match = ofproto_v1_3_parser.OFPMatch( + _ordered_fields=[('conj_id', 237), ('eth_type', 2048), + ('reg5', 12), ('ct_state', (2, 14))]) + new_rule = mock.Mock(match=match) + self.dump_flows_ret[0] = INIT_OF_RULES + [new_rule] + self.mock_int_br.dump_flows.side_effect = self.dump_flows_ret + self.assertEqual(240, self.conj_id_map._init_max_id(self.mock_int_br)) + + def test__init_max_id_vsctl(self): + self.mock_int_br.dump_flows.side_effect = AttributeError() + dump_flows_ret = [[]] * len(ovs_consts.OVS_FIREWALL_TABLES) + dump_flows_ret[0] = INIT_OF_RULES_VSCTL + self.mock_int_br.dump_flows_for_table.side_effect = dump_flows_ret + + self.assertEqual(208, self.conj_id_map._init_max_id(self.mock_int_br)) + + new_rule = ('priority=70,conj_id=237,ct_state=+est-rel-rpl,ipv6,reg5=0' + 'xc actions=load:0xf8->NXM_NX_REG7[],output:12') + dump_flows_ret[0] = INIT_OF_RULES_VSCTL + [new_rule] + self.mock_int_br.dump_flows_for_table.side_effect = dump_flows_ret + self.assertEqual(240, self.conj_id_map._init_max_id(self.mock_int_br)) + + def test__next_max_id(self): + self.assertEqual(8, self.conj_id_map._next_max_id(0)) + self.assertEqual(0, self.conj_id_map._next_max_id( + self.conj_id_map.MAX_CONJ_ID - 1)) + class TestConjIPFlowManager(base.BaseTestCase): def setUp(self): super(TestConjIPFlowManager, self).setUp() self.driver = mock.Mock() + self.driver.int_br.br.dump_flows.return_value = INIT_OF_RULES self.manager = ovsfw.ConjIPFlowManager(self.driver) self.vlan_tag = 100 self.conj_id = 16 @@ -1164,6 +1232,8 @@ class TestCookieContext(base.BaseTestCase): ovsfw.OVSFirewallDriver, 'initialize_bridge', return_value=bridge.deferred( full_ordered=True, use_bundle=True)).start() + mock.patch.object(ovsfw.ConjIdMap, '_init_max_id', + return_value=0).start() securitygroups_rpc.register_securitygroups_opts() self.firewall = ovsfw.OVSFirewallDriver(bridge) diff --git a/neutron/tests/unit/services/logapi/drivers/openvswitch/test_ovs_firewall_log.py b/neutron/tests/unit/services/logapi/drivers/openvswitch/test_ovs_firewall_log.py index 2642142c96b..13cf4425dfc 100644 --- a/neutron/tests/unit/services/logapi/drivers/openvswitch/test_ovs_firewall_log.py +++ b/neutron/tests/unit/services/logapi/drivers/openvswitch/test_ovs_firewall_log.py @@ -102,6 +102,12 @@ class FakeOVSPort(object): class TestOVSFirewallLoggingDriver(base.BaseTestCase): def setUp(self): super(TestOVSFirewallLoggingDriver, self).setUp() + mock_int_br = mock.Mock() + mock_int_br.br.dump_flows.return_value = [] + self._mock_initialize_bridge = mock.patch.object( + ovsfw_log.OVSFirewallLoggingDriver, 'initialize_bridge', + return_value=mock_int_br) + self.mock_initialize_bridge = self._mock_initialize_bridge.start() self.log_driver = ovsfw_log.OVSFirewallLoggingDriver(mock.Mock()) resource_rpc_mock = mock.patch.object( agent_rpc, 'LoggingApiStub', autospec=True).start() @@ -132,10 +138,12 @@ class TestOVSFirewallLoggingDriver(base.BaseTestCase): return self.mock_bridge.br.get_vif_port_by_id.return_value.vif_mac def test_initialize_bridge(self): + self._mock_initialize_bridge.stop() br = self.log_driver.initialize_bridge(self.mock_bridge) self.assertEqual(self.mock_bridge.deferred.return_value, br) def test_set_controller_rate_limit(self): + self._mock_initialize_bridge.stop() set_log_driver_config(100, 25) self.log_driver.initialize_bridge(self.mock_bridge) expected_calls = [mock.call.set_controller_rate_limit(100),