diff --git a/neutron/agent/common/ovs_lib.py b/neutron/agent/common/ovs_lib.py index aa0c6f809c1..7e1409af727 100644 --- a/neutron/agent/common/ovs_lib.py +++ b/neutron/agent/common/ovs_lib.py @@ -16,7 +16,6 @@ import collections import functools import itertools -import operator import random import time import uuid @@ -78,6 +77,9 @@ CTRL_BURST_LIMIT_MIN = 25 # TODO(slaweq): move this to neutron_lib.constants TYPE_GRE_IP6 = 'ip6gre' +ActionFlowTuple = collections.namedtuple('ActionFlowTuple', + ['action', 'flow', 'flow_group_id']) + def _ovsdb_result_pending(result): """Return True if ovsdb indicates the result is still pending.""" @@ -251,6 +253,7 @@ class OVSBridge(BaseOVS): self.initial_protocols = { constants.OPENFLOW10, constants.OPENFLOW13, constants.OPENFLOW14} self.initial_protocols.add(self._highest_protocol_needed) + self._flows_per_port = cfg.CONF.OVS.openflow_processed_per_port @property def default_cookie(self): @@ -468,7 +471,22 @@ class OVSBridge(BaseOVS): self.br_name) raise RuntimeError(_('No datapath_id on bridge %s') % self.br_name) - def do_action_flows(self, action, kwargs_list, use_bundle=False): + def do_action_flows_by_group_id(self, action, flows_by_group_id, + use_bundle=False): + if self._flows_per_port: + # Group flow actions per port. + for flow_group_id, flows in flows_by_group_id.items(): + self.do_action_flows(action, flows, use_bundle=use_bundle, + flow_group_id=flow_group_id) + else: + # Group all actions in one single list without any group ID + # reference. + flows = [item for _list in flows_by_group_id.values() + for item in _list] + self.do_action_flows(action, flows, use_bundle=use_bundle) + + def do_action_flows(self, action, kwargs_list, use_bundle=False, + flow_group_id=None): # we can't mix strict and non-strict, so we'll use the first kw # and check against other kw being different strict = kwargs_list[0].get('strict', False) @@ -515,7 +533,15 @@ class OVSBridge(BaseOVS): if use_bundle: extra_param.append('--bundle') - step = common_constants.AGENT_RES_PROCESSING_STEP + if flow_group_id: + # NOTE(ralonsoh): all flows belonging to a port will be written + # atomically in the same command. + step = len(flow_strs) + else: + # No group ID defined (flows are not grouped per port). Use the + # default batch step value "openflow_number_processing_step". + step = common_constants.AGENT_RES_PROCESSING_STEP + for i in range(0, len(flow_strs), step): self.run_ofctl('%s-flows' % action, extra_param + ['-'], '\n'.join(flow_strs[i:i + step])) @@ -1243,14 +1269,15 @@ class DeferredOVSBridge(object): return getattr(self.br, name) raise AttributeError(name) - def add_flow(self, **kwargs): - self.action_flow_tuples.append(('add', kwargs)) + def add_flow(self, flow_group_id=None, **kwargs): + self.action_flow_tuples.append( + ActionFlowTuple('add', kwargs, flow_group_id)) def mod_flow(self, **kwargs): - self.action_flow_tuples.append(('mod', kwargs)) + self.action_flow_tuples.append(ActionFlowTuple('mod', kwargs, None)) def delete_flows(self, **kwargs): - self.action_flow_tuples.append(('del', kwargs)) + self.action_flow_tuples.append(ActionFlowTuple('del', kwargs, None)) def apply_flows(self): action_flow_tuples = self.action_flow_tuples @@ -1259,14 +1286,16 @@ class DeferredOVSBridge(object): return if not self.full_ordered: - action_flow_tuples.sort(key=lambda af: self.weights[af[0]]) + action_flow_tuples.sort(key=lambda flow: self.weights[flow.action]) - grouped = itertools.groupby(action_flow_tuples, - key=operator.itemgetter(0)) - itemgetter_1 = operator.itemgetter(1) - for action, action_flow_list in grouped: - flows = list(map(itemgetter_1, action_flow_list)) - self.br.do_action_flows(action, flows, self.use_bundle) + flows_by_action = itertools.groupby(action_flow_tuples, + key=lambda af: af.action) + for action, flows in flows_by_action: + flows_by_group_id = collections.defaultdict(list) + for flow in flows: + flows_by_group_id[flow.flow_group_id].append(flow.flow) + self.br.do_action_flows_by_group_id(action, flows_by_group_id, + self.use_bundle) def __enter__(self): return self diff --git a/neutron/agent/linux/openvswitch_firewall/firewall.py b/neutron/agent/linux/openvswitch_firewall/firewall.py index 4c9730b2e78..e5ada18a2ae 100644 --- a/neutron/agent/linux/openvswitch_firewall/firewall.py +++ b/neutron/agent/linux/openvswitch_firewall/firewall.py @@ -406,7 +406,7 @@ class ConjIPFlowManager(object): def _update_flows_for_vlan_subr(self, direction, ethertype, vlan_tag, flow_state, addr_to_conj, - conj_id_to_remove): + conj_id_to_remove, ofport): """Do the actual flow updates for given direction and ethertype.""" conj_id_to_remove = conj_id_to_remove or [] # Delete any current flow related to any deleted IP address, before @@ -450,9 +450,9 @@ class ConjIPFlowManager(object): continue for flow in rules.create_flows_for_ip_address( addr, direction, ethertype, vlan_tag, conj_ids): - self.driver._add_flow(**flow) + self.driver._add_flow(flow_group_id=ofport, **flow) - def update_flows_for_vlan(self, vlan_tag, conj_id_to_remove=None): + def update_flows_for_vlan(self, vlan_tag, ofport, conj_id_to_remove=None): """Install action=conjunction(conj_id, 1/2) flows, which depend on IP addresses of remote_group_id or remote_address_group_id. @@ -466,7 +466,7 @@ class ConjIPFlowManager(object): self._update_flows_for_vlan_subr( direction, ethertype, vlan_tag, self.flow_state[vlan_tag][(direction, ethertype)], - addr_to_conj, conj_id_to_remove) + addr_to_conj, conj_id_to_remove, ofport) self.flow_state[vlan_tag][(direction, ethertype)] = addr_to_conj def add(self, vlan_tag, sg_id, remote_id, direction, ethertype, @@ -522,7 +522,7 @@ class ConjIPFlowManager(object): update = True if update: - self.update_flows_for_vlan(vlan_tag, + self.update_flows_for_vlan(vlan_tag, None, conj_id_to_remove=conj_id_to_remove) @@ -595,7 +595,18 @@ class OVSFirewallDriver(firewall.FirewallDriver): for f in rules.create_accept_flows(flow): self._add_flow(**f) - def _add_flow(self, **kwargs): + def _add_flow(self, flow_group_id=None, **kwargs): + """Add a new flow. + + Most of the port related flows will have the parameters "reg_port" or + "in_port". If no "flow_group_id" is defined, "in_port" or "reg_port" + will be used instead (those parameters store the port "ofport"). The + flow group ID will be used to commit all flows related to a port in + the same transaction (for deferred OVS bridge implementation only). + """ + flow_group_id = (flow_group_id or + kwargs.get('in_port') or + kwargs.get('reg_port')) dl_type = kwargs.get('dl_type') create_reg_numbers(kwargs) if isinstance(dl_type, int): @@ -603,7 +614,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): if self._update_cookie: kwargs['cookie'] = self._update_cookie if self._deferred: - self.int_br.add_flow(**kwargs) + self.int_br.add_flow(flow_group_id=flow_group_id, **kwargs) else: self.int_br.br.add_flow(**kwargs) @@ -889,6 +900,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): actions += 'strip_vlan,resubmit(,{:d})'.format( ovs_consts.BASE_INGRESS_TABLE) self._add_flow( + flow_group_id=ofport, table=ovs_consts.TRANSIENT_TABLE, priority=90, dl_dst=mac, @@ -900,6 +912,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): actions += 'resubmit(,{:d})'.format( ovs_consts.BASE_INGRESS_TABLE) self._add_flow( + flow_group_id=ofport, table=ovs_consts.TRANSIENT_TABLE, priority=90, dl_dst=mac, @@ -944,6 +957,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): port.vlan_tag, port.network_type) self._add_flow( + flow_group_id=port.ofport, table=ovs_consts.TRANSIENT_TABLE, priority=90, dl_dst=mac_addr, @@ -1198,6 +1212,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): # and if not, accept it for mac_addr in port.all_allowed_macs: self._add_flow( + flow_group_id=port.ofport, table=ovs_consts.ACCEPT_OR_INGRESS_TABLE, priority=100, dl_dst=mac_addr, @@ -1240,6 +1255,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): # Prevent flood for accepted egress traffic self._add_flow( + flow_group_id=dst_port, table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE, priority=12, dl_dst=mac, @@ -1272,6 +1288,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): if patch_ofport is not ovs_lib.INVALID_OFPORT: self._add_flow( + flow_group_id=dst_port, table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE, priority=10, dl_src=mac, @@ -1296,6 +1313,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): def _initialize_tracked_egress(self, port): # Drop invalid packets self._add_flow( + flow_group_id=port.ofport, table=ovs_consts.RULES_EGRESS_TABLE, priority=50, ct_state=ovsfw_consts.OF_STATE_INVALID, @@ -1411,6 +1429,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): def _initialize_tracked_ingress(self, port): # Drop invalid packets self._add_flow( + flow_group_id=port.ofport, table=ovs_consts.RULES_INGRESS_TABLE, priority=50, ct_state=ovsfw_consts.OF_STATE_INVALID, @@ -1557,7 +1576,8 @@ class OVSFirewallDriver(firewall.FirewallDriver): self._add_non_ip_conj_flows(port) - self.conj_ip_manager.update_flows_for_vlan(port.vlan_tag) + self.conj_ip_manager.update_flows_for_vlan(port.vlan_tag, + port.ofport) def _create_rules_generator_for_port(self, port): for sec_group in port.sec_groups: diff --git a/neutron/conf/agent/ovs_conf.py b/neutron/conf/agent/ovs_conf.py index f8fa8ef2d0e..e707bbefd80 100644 --- a/neutron/conf/agent/ovs_conf.py +++ b/neutron/conf/agent/ovs_conf.py @@ -44,6 +44,15 @@ OPTS = [ 'unregistered multicast packets to all ports. ' 'The switch will send unregistered multicast packets ' 'only to ports connected to multicast routers.')), + cfg.BoolOpt('openflow_processed_per_port', + default=False, + help=_('If enabled, all OpenFlow rules associated to a port ' + 'are processed at once, in one single transaction. ' + 'That avoids possible inconsistencies during OVS agent ' + 'restart and port updates. ' + 'If disabled, the flows will be processed in batches ' + 'of "openflow_number_processing_step" number of ' + 'OpenFlow rules.')), ] diff --git a/neutron/tests/unit/agent/common/test_ovs_lib.py b/neutron/tests/unit/agent/common/test_ovs_lib.py index f43e32a3c21..fc2178ef723 100644 --- a/neutron/tests/unit/agent/common/test_ovs_lib.py +++ b/neutron/tests/unit/agent/common/test_ovs_lib.py @@ -612,8 +612,8 @@ class TestDeferredOVSBridge(base.BaseTestCase): super(TestDeferredOVSBridge, self).setUp() self.br = mock.Mock() - self.mocked_do_action_flows = mock.patch.object( - self.br, 'do_action_flows').start() + self.mock_do_action_flows_by_group_id = mock.patch.object( + self.br, 'do_action_flows_by_group_id').start() self.add_flow_dict1 = dict(in_port=11, actions='drop') self.add_flow_dict2 = dict(in_port=12, actions='drop') @@ -628,15 +628,15 @@ class TestDeferredOVSBridge(base.BaseTestCase): ovs_lib.DeferredOVSBridge.ALLOWED_PASSTHROUGHS) def _verify_mock_call(self, expected_calls): - self.mocked_do_action_flows.assert_has_calls(expected_calls) + self.mock_do_action_flows_by_group_id.assert_has_calls(expected_calls) self.assertEqual(len(expected_calls), - len(self.mocked_do_action_flows.mock_calls)) + len(self.mock_do_action_flows_by_group_id.mock_calls)) def test_apply_on_exit(self): expected_calls = [ - mock.call('add', [self.add_flow_dict1], False), - mock.call('mod', [self.mod_flow_dict1], False), - mock.call('del', [self.del_flow_dict1], False), + mock.call('add', {None: [self.add_flow_dict1]}, False), + mock.call('mod', {None: [self.mod_flow_dict1]}, False), + mock.call('del', {None: [self.del_flow_dict1]}, False), ] with ovs_lib.DeferredOVSBridge(self.br) as deferred_br: @@ -660,13 +660,16 @@ class TestDeferredOVSBridge(base.BaseTestCase): def test_apply(self): expected_calls = [ - mock.call('add', [self.add_flow_dict1], False), - mock.call('mod', [self.mod_flow_dict1], False), - mock.call('del', [self.del_flow_dict1], False), + mock.call('add', + {11: [self.add_flow_dict1], 12: [self.add_flow_dict2]}, + False), + mock.call('mod', {None: [self.mod_flow_dict1]}, False), + mock.call('del', {None: [self.del_flow_dict1]}, False), ] with ovs_lib.DeferredOVSBridge(self.br) as deferred_br: - deferred_br.add_flow(**self.add_flow_dict1) + deferred_br.add_flow(flow_group_id=11, **self.add_flow_dict1) + deferred_br.add_flow(flow_group_id=12, **self.add_flow_dict2) deferred_br.mod_flow(**self.mod_flow_dict1) deferred_br.delete_flows(**self.del_flow_dict1) self._verify_mock_call([]) @@ -676,12 +679,15 @@ class TestDeferredOVSBridge(base.BaseTestCase): def test_apply_order(self): expected_calls = [ - mock.call( - 'del', [self.del_flow_dict1, self.del_flow_dict2], False), - mock.call( - 'mod', [self.mod_flow_dict1, self.mod_flow_dict2], False), - mock.call( - 'add', [self.add_flow_dict1, self.add_flow_dict2], False), + mock.call('del', + {None: [self.del_flow_dict1, self.del_flow_dict2]}, + False), + mock.call('mod', + {None: [self.mod_flow_dict1, self.mod_flow_dict2]}, + False), + mock.call('add', + {None: [self.add_flow_dict1, self.add_flow_dict2]}, + False), ] order = 'del', 'mod', 'add' @@ -696,12 +702,13 @@ class TestDeferredOVSBridge(base.BaseTestCase): def test_apply_full_ordered(self): expected_calls = [ - mock.call('add', [self.add_flow_dict1], False), - mock.call('mod', [self.mod_flow_dict1], False), - mock.call( - 'del', [self.del_flow_dict1, self.del_flow_dict2], False), - mock.call('add', [self.add_flow_dict2], False), - mock.call('mod', [self.mod_flow_dict2], False), + mock.call('add', {None: [self.add_flow_dict1]}, False), + mock.call('mod', {None: [self.mod_flow_dict1]}, False), + mock.call('del', + {None: [self.del_flow_dict1, self.del_flow_dict2]}, + False), + mock.call('add', {None: [self.add_flow_dict2]}, False), + mock.call('mod', {None: [self.mod_flow_dict2]}, False), ] with ovs_lib.DeferredOVSBridge(self.br, diff --git a/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py b/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py index 0976b2f646d..a7acf8e1b50 100644 --- a/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py +++ b/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py @@ -423,7 +423,7 @@ class TestConjIPFlowManager(base.BaseTestCase): get_conj_id_mock.return_value = self.conj_id self.manager.add(self.vlan_tag, 'sg', 'remote_id', constants.INGRESS_DIRECTION, constants.IPv4, 0) - self.manager.update_flows_for_vlan(self.vlan_tag) + self.manager.update_flows_for_vlan(self.vlan_tag, mock.ANY) self.assertFalse(remote_group.get_ethertype_filtered_addresses.called) self.assertFalse(self.driver._add_flow.called) @@ -439,7 +439,7 @@ class TestConjIPFlowManager(base.BaseTestCase): get_conj_id_mock.return_value = self.conj_id self.manager.add(self.vlan_tag, 'sg', 'remote_id', constants.INGRESS_DIRECTION, constants.IPv4, 0) - self.manager.update_flows_for_vlan(self.vlan_tag) + self.manager.update_flows_for_vlan(self.vlan_tag, mock.ANY) self.assertTrue(remote_group.get_ethertype_filtered_addresses.called) self.assertTrue(self.driver._add_flow.called) @@ -454,20 +454,24 @@ class TestConjIPFlowManager(base.BaseTestCase): constants.INGRESS_DIRECTION, constants.IPv4, 0) self.manager.add(self.vlan_tag, 'sg', 'remote_id', constants.INGRESS_DIRECTION, constants.IPv4, 3) - self.manager.update_flows_for_vlan(self.vlan_tag) + self.manager.update_flows_for_vlan(self.vlan_tag, 'ofport1') self.assertEqual(self.driver._add_flow.call_args_list, [mock.call(actions='conjunction(16,1/2)', ct_state='+est-rel-rpl', dl_type=2048, nw_src='10.22.3.4/32', priority=70, - reg_net=self.vlan_tag, table=82), + reg_net=self.vlan_tag, table=82, + flow_group_id='ofport1'), mock.call(actions='conjunction(17,1/2)', ct_state='+new-est', dl_type=2048, nw_src='10.22.3.4/32', priority=70, - reg_net=self.vlan_tag, table=82), + reg_net=self.vlan_tag, table=82, + flow_group_id='ofport1'), mock.call(actions='conjunction(22,1/2)', ct_state='+est-rel-rpl', dl_type=2048, nw_src='10.22.3.4/32', priority=73, - reg_net=self.vlan_tag, table=82), + reg_net=self.vlan_tag, table=82, + flow_group_id='ofport1'), mock.call(actions='conjunction(23,1/2)', ct_state='+new-est', dl_type=2048, nw_src='10.22.3.4/32', priority=73, - reg_net=self.vlan_tag, table=82)]) + reg_net=self.vlan_tag, table=82, + flow_group_id='ofport1')]) def _sg_removed(self, sg_name): with mock.patch.object(self.manager.conj_id_map, diff --git a/releasenotes/notes/ovs-of-rules-processing-parameters-b38f7a1e88568798.yaml b/releasenotes/notes/ovs-of-rules-processing-parameters-b38f7a1e88568798.yaml new file mode 100644 index 00000000000..d3518d547ea --- /dev/null +++ b/releasenotes/notes/ovs-of-rules-processing-parameters-b38f7a1e88568798.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + Added a new configuration variable, in ``[OVS]`` section, to control + the OVS OpenFlow rule processing operations when using the OVS native + firewall driver (``securitygroup.firewall_driver=openvswitch``): + + * ``openflow_processed_per_port``: by default "False". If enabled, all + OpenFlow rules associated to a port will be processed at once, in a + single transaction. If disabled, the flows will be processed in batches + of "AGENT_RES_PROCESSING_STEP=100" number of OpenFlow rules.