Merge "Allow to process FW OF rules belonging to a port in a single operation"

This commit is contained in:
Zuul 2022-05-10 16:48:47 +00:00 committed by Gerrit Code Review
commit dc854e8e1e
6 changed files with 132 additions and 52 deletions

View File

@ -16,7 +16,6 @@
import collections
import functools
import itertools
import operator
import random
import time
import uuid
@ -78,6 +77,9 @@ CTRL_BURST_LIMIT_MIN = 25
# TODO(slaweq): move this to neutron_lib.constants
TYPE_GRE_IP6 = 'ip6gre'
ActionFlowTuple = collections.namedtuple('ActionFlowTuple',
['action', 'flow', 'flow_group_id'])
def _ovsdb_result_pending(result):
"""Return True if ovsdb indicates the result is still pending."""
@ -251,6 +253,7 @@ class OVSBridge(BaseOVS):
self.initial_protocols = {
constants.OPENFLOW10, constants.OPENFLOW13, constants.OPENFLOW14}
self.initial_protocols.add(self._highest_protocol_needed)
self._flows_per_port = cfg.CONF.OVS.openflow_processed_per_port
@property
def default_cookie(self):
@ -468,7 +471,22 @@ class OVSBridge(BaseOVS):
self.br_name)
raise RuntimeError(_('No datapath_id on bridge %s') % self.br_name)
def do_action_flows(self, action, kwargs_list, use_bundle=False):
def do_action_flows_by_group_id(self, action, flows_by_group_id,
use_bundle=False):
if self._flows_per_port:
# Group flow actions per port.
for flow_group_id, flows in flows_by_group_id.items():
self.do_action_flows(action, flows, use_bundle=use_bundle,
flow_group_id=flow_group_id)
else:
# Group all actions in one single list without any group ID
# reference.
flows = [item for _list in flows_by_group_id.values()
for item in _list]
self.do_action_flows(action, flows, use_bundle=use_bundle)
def do_action_flows(self, action, kwargs_list, use_bundle=False,
flow_group_id=None):
# we can't mix strict and non-strict, so we'll use the first kw
# and check against other kw being different
strict = kwargs_list[0].get('strict', False)
@ -515,7 +533,15 @@ class OVSBridge(BaseOVS):
if use_bundle:
extra_param.append('--bundle')
step = common_constants.AGENT_RES_PROCESSING_STEP
if flow_group_id:
# NOTE(ralonsoh): all flows belonging to a port will be written
# atomically in the same command.
step = len(flow_strs)
else:
# No group ID defined (flows are not grouped per port). Use the
# default batch step value "openflow_number_processing_step".
step = common_constants.AGENT_RES_PROCESSING_STEP
for i in range(0, len(flow_strs), step):
self.run_ofctl('%s-flows' % action, extra_param + ['-'],
'\n'.join(flow_strs[i:i + step]))
@ -1243,14 +1269,15 @@ class DeferredOVSBridge(object):
return getattr(self.br, name)
raise AttributeError(name)
def add_flow(self, **kwargs):
self.action_flow_tuples.append(('add', kwargs))
def add_flow(self, flow_group_id=None, **kwargs):
self.action_flow_tuples.append(
ActionFlowTuple('add', kwargs, flow_group_id))
def mod_flow(self, **kwargs):
self.action_flow_tuples.append(('mod', kwargs))
self.action_flow_tuples.append(ActionFlowTuple('mod', kwargs, None))
def delete_flows(self, **kwargs):
self.action_flow_tuples.append(('del', kwargs))
self.action_flow_tuples.append(ActionFlowTuple('del', kwargs, None))
def apply_flows(self):
action_flow_tuples = self.action_flow_tuples
@ -1259,14 +1286,16 @@ class DeferredOVSBridge(object):
return
if not self.full_ordered:
action_flow_tuples.sort(key=lambda af: self.weights[af[0]])
action_flow_tuples.sort(key=lambda flow: self.weights[flow.action])
grouped = itertools.groupby(action_flow_tuples,
key=operator.itemgetter(0))
itemgetter_1 = operator.itemgetter(1)
for action, action_flow_list in grouped:
flows = list(map(itemgetter_1, action_flow_list))
self.br.do_action_flows(action, flows, self.use_bundle)
flows_by_action = itertools.groupby(action_flow_tuples,
key=lambda af: af.action)
for action, flows in flows_by_action:
flows_by_group_id = collections.defaultdict(list)
for flow in flows:
flows_by_group_id[flow.flow_group_id].append(flow.flow)
self.br.do_action_flows_by_group_id(action, flows_by_group_id,
self.use_bundle)
def __enter__(self):
return self

View File

@ -406,7 +406,7 @@ class ConjIPFlowManager(object):
def _update_flows_for_vlan_subr(self, direction, ethertype, vlan_tag,
flow_state, addr_to_conj,
conj_id_to_remove):
conj_id_to_remove, ofport):
"""Do the actual flow updates for given direction and ethertype."""
conj_id_to_remove = conj_id_to_remove or []
# Delete any current flow related to any deleted IP address, before
@ -450,9 +450,9 @@ class ConjIPFlowManager(object):
continue
for flow in rules.create_flows_for_ip_address(
addr, direction, ethertype, vlan_tag, conj_ids):
self.driver._add_flow(**flow)
self.driver._add_flow(flow_group_id=ofport, **flow)
def update_flows_for_vlan(self, vlan_tag, conj_id_to_remove=None):
def update_flows_for_vlan(self, vlan_tag, ofport, conj_id_to_remove=None):
"""Install action=conjunction(conj_id, 1/2) flows,
which depend on IP addresses of remote_group_id or
remote_address_group_id.
@ -466,7 +466,7 @@ class ConjIPFlowManager(object):
self._update_flows_for_vlan_subr(
direction, ethertype, vlan_tag,
self.flow_state[vlan_tag][(direction, ethertype)],
addr_to_conj, conj_id_to_remove)
addr_to_conj, conj_id_to_remove, ofport)
self.flow_state[vlan_tag][(direction, ethertype)] = addr_to_conj
def add(self, vlan_tag, sg_id, remote_id, direction, ethertype,
@ -522,7 +522,7 @@ class ConjIPFlowManager(object):
update = True
if update:
self.update_flows_for_vlan(vlan_tag,
self.update_flows_for_vlan(vlan_tag, None,
conj_id_to_remove=conj_id_to_remove)
@ -595,7 +595,18 @@ class OVSFirewallDriver(firewall.FirewallDriver):
for f in rules.create_accept_flows(flow):
self._add_flow(**f)
def _add_flow(self, **kwargs):
def _add_flow(self, flow_group_id=None, **kwargs):
"""Add a new flow.
Most of the port related flows will have the parameters "reg_port" or
"in_port". If no "flow_group_id" is defined, "in_port" or "reg_port"
will be used instead (those parameters store the port "ofport"). The
flow group ID will be used to commit all flows related to a port in
the same transaction (for deferred OVS bridge implementation only).
"""
flow_group_id = (flow_group_id or
kwargs.get('in_port') or
kwargs.get('reg_port'))
dl_type = kwargs.get('dl_type')
create_reg_numbers(kwargs)
if isinstance(dl_type, int):
@ -603,7 +614,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
if self._update_cookie:
kwargs['cookie'] = self._update_cookie
if self._deferred:
self.int_br.add_flow(**kwargs)
self.int_br.add_flow(flow_group_id=flow_group_id, **kwargs)
else:
self.int_br.br.add_flow(**kwargs)
@ -889,6 +900,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
actions += 'strip_vlan,resubmit(,{:d})'.format(
ovs_consts.BASE_INGRESS_TABLE)
self._add_flow(
flow_group_id=ofport,
table=ovs_consts.TRANSIENT_TABLE,
priority=90,
dl_dst=mac,
@ -900,6 +912,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
actions += 'resubmit(,{:d})'.format(
ovs_consts.BASE_INGRESS_TABLE)
self._add_flow(
flow_group_id=ofport,
table=ovs_consts.TRANSIENT_TABLE,
priority=90,
dl_dst=mac,
@ -944,6 +957,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
port.vlan_tag, port.network_type)
self._add_flow(
flow_group_id=port.ofport,
table=ovs_consts.TRANSIENT_TABLE,
priority=90,
dl_dst=mac_addr,
@ -1198,6 +1212,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
# and if not, accept it
for mac_addr in port.all_allowed_macs:
self._add_flow(
flow_group_id=port.ofport,
table=ovs_consts.ACCEPT_OR_INGRESS_TABLE,
priority=100,
dl_dst=mac_addr,
@ -1240,6 +1255,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
# Prevent flood for accepted egress traffic
self._add_flow(
flow_group_id=dst_port,
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE,
priority=12,
dl_dst=mac,
@ -1272,6 +1288,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
if patch_ofport is not ovs_lib.INVALID_OFPORT:
self._add_flow(
flow_group_id=dst_port,
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE,
priority=10,
dl_src=mac,
@ -1296,6 +1313,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
def _initialize_tracked_egress(self, port):
# Drop invalid packets
self._add_flow(
flow_group_id=port.ofport,
table=ovs_consts.RULES_EGRESS_TABLE,
priority=50,
ct_state=ovsfw_consts.OF_STATE_INVALID,
@ -1411,6 +1429,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
def _initialize_tracked_ingress(self, port):
# Drop invalid packets
self._add_flow(
flow_group_id=port.ofport,
table=ovs_consts.RULES_INGRESS_TABLE,
priority=50,
ct_state=ovsfw_consts.OF_STATE_INVALID,
@ -1557,7 +1576,8 @@ class OVSFirewallDriver(firewall.FirewallDriver):
self._add_non_ip_conj_flows(port)
self.conj_ip_manager.update_flows_for_vlan(port.vlan_tag)
self.conj_ip_manager.update_flows_for_vlan(port.vlan_tag,
port.ofport)
def _create_rules_generator_for_port(self, port):
for sec_group in port.sec_groups:

View File

@ -44,6 +44,15 @@ OPTS = [
'unregistered multicast packets to all ports. '
'The switch will send unregistered multicast packets '
'only to ports connected to multicast routers.')),
cfg.BoolOpt('openflow_processed_per_port',
default=False,
help=_('If enabled, all OpenFlow rules associated to a port '
'are processed at once, in one single transaction. '
'That avoids possible inconsistencies during OVS agent '
'restart and port updates. '
'If disabled, the flows will be processed in batches '
'of "openflow_number_processing_step" number of '
'OpenFlow rules.')),
]

View File

@ -612,8 +612,8 @@ class TestDeferredOVSBridge(base.BaseTestCase):
super(TestDeferredOVSBridge, self).setUp()
self.br = mock.Mock()
self.mocked_do_action_flows = mock.patch.object(
self.br, 'do_action_flows').start()
self.mock_do_action_flows_by_group_id = mock.patch.object(
self.br, 'do_action_flows_by_group_id').start()
self.add_flow_dict1 = dict(in_port=11, actions='drop')
self.add_flow_dict2 = dict(in_port=12, actions='drop')
@ -628,15 +628,15 @@ class TestDeferredOVSBridge(base.BaseTestCase):
ovs_lib.DeferredOVSBridge.ALLOWED_PASSTHROUGHS)
def _verify_mock_call(self, expected_calls):
self.mocked_do_action_flows.assert_has_calls(expected_calls)
self.mock_do_action_flows_by_group_id.assert_has_calls(expected_calls)
self.assertEqual(len(expected_calls),
len(self.mocked_do_action_flows.mock_calls))
len(self.mock_do_action_flows_by_group_id.mock_calls))
def test_apply_on_exit(self):
expected_calls = [
mock.call('add', [self.add_flow_dict1], False),
mock.call('mod', [self.mod_flow_dict1], False),
mock.call('del', [self.del_flow_dict1], False),
mock.call('add', {None: [self.add_flow_dict1]}, False),
mock.call('mod', {None: [self.mod_flow_dict1]}, False),
mock.call('del', {None: [self.del_flow_dict1]}, False),
]
with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
@ -660,13 +660,16 @@ class TestDeferredOVSBridge(base.BaseTestCase):
def test_apply(self):
expected_calls = [
mock.call('add', [self.add_flow_dict1], False),
mock.call('mod', [self.mod_flow_dict1], False),
mock.call('del', [self.del_flow_dict1], False),
mock.call('add',
{11: [self.add_flow_dict1], 12: [self.add_flow_dict2]},
False),
mock.call('mod', {None: [self.mod_flow_dict1]}, False),
mock.call('del', {None: [self.del_flow_dict1]}, False),
]
with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
deferred_br.add_flow(**self.add_flow_dict1)
deferred_br.add_flow(flow_group_id=11, **self.add_flow_dict1)
deferred_br.add_flow(flow_group_id=12, **self.add_flow_dict2)
deferred_br.mod_flow(**self.mod_flow_dict1)
deferred_br.delete_flows(**self.del_flow_dict1)
self._verify_mock_call([])
@ -676,12 +679,15 @@ class TestDeferredOVSBridge(base.BaseTestCase):
def test_apply_order(self):
expected_calls = [
mock.call(
'del', [self.del_flow_dict1, self.del_flow_dict2], False),
mock.call(
'mod', [self.mod_flow_dict1, self.mod_flow_dict2], False),
mock.call(
'add', [self.add_flow_dict1, self.add_flow_dict2], False),
mock.call('del',
{None: [self.del_flow_dict1, self.del_flow_dict2]},
False),
mock.call('mod',
{None: [self.mod_flow_dict1, self.mod_flow_dict2]},
False),
mock.call('add',
{None: [self.add_flow_dict1, self.add_flow_dict2]},
False),
]
order = 'del', 'mod', 'add'
@ -696,12 +702,13 @@ class TestDeferredOVSBridge(base.BaseTestCase):
def test_apply_full_ordered(self):
expected_calls = [
mock.call('add', [self.add_flow_dict1], False),
mock.call('mod', [self.mod_flow_dict1], False),
mock.call(
'del', [self.del_flow_dict1, self.del_flow_dict2], False),
mock.call('add', [self.add_flow_dict2], False),
mock.call('mod', [self.mod_flow_dict2], False),
mock.call('add', {None: [self.add_flow_dict1]}, False),
mock.call('mod', {None: [self.mod_flow_dict1]}, False),
mock.call('del',
{None: [self.del_flow_dict1, self.del_flow_dict2]},
False),
mock.call('add', {None: [self.add_flow_dict2]}, False),
mock.call('mod', {None: [self.mod_flow_dict2]}, False),
]
with ovs_lib.DeferredOVSBridge(self.br,

View File

@ -423,7 +423,7 @@ class TestConjIPFlowManager(base.BaseTestCase):
get_conj_id_mock.return_value = self.conj_id
self.manager.add(self.vlan_tag, 'sg', 'remote_id',
constants.INGRESS_DIRECTION, constants.IPv4, 0)
self.manager.update_flows_for_vlan(self.vlan_tag)
self.manager.update_flows_for_vlan(self.vlan_tag, mock.ANY)
self.assertFalse(remote_group.get_ethertype_filtered_addresses.called)
self.assertFalse(self.driver._add_flow.called)
@ -439,7 +439,7 @@ class TestConjIPFlowManager(base.BaseTestCase):
get_conj_id_mock.return_value = self.conj_id
self.manager.add(self.vlan_tag, 'sg', 'remote_id',
constants.INGRESS_DIRECTION, constants.IPv4, 0)
self.manager.update_flows_for_vlan(self.vlan_tag)
self.manager.update_flows_for_vlan(self.vlan_tag, mock.ANY)
self.assertTrue(remote_group.get_ethertype_filtered_addresses.called)
self.assertTrue(self.driver._add_flow.called)
@ -454,20 +454,24 @@ class TestConjIPFlowManager(base.BaseTestCase):
constants.INGRESS_DIRECTION, constants.IPv4, 0)
self.manager.add(self.vlan_tag, 'sg', 'remote_id',
constants.INGRESS_DIRECTION, constants.IPv4, 3)
self.manager.update_flows_for_vlan(self.vlan_tag)
self.manager.update_flows_for_vlan(self.vlan_tag, 'ofport1')
self.assertEqual(self.driver._add_flow.call_args_list,
[mock.call(actions='conjunction(16,1/2)', ct_state='+est-rel-rpl',
dl_type=2048, nw_src='10.22.3.4/32', priority=70,
reg_net=self.vlan_tag, table=82),
reg_net=self.vlan_tag, table=82,
flow_group_id='ofport1'),
mock.call(actions='conjunction(17,1/2)', ct_state='+new-est',
dl_type=2048, nw_src='10.22.3.4/32', priority=70,
reg_net=self.vlan_tag, table=82),
reg_net=self.vlan_tag, table=82,
flow_group_id='ofport1'),
mock.call(actions='conjunction(22,1/2)', ct_state='+est-rel-rpl',
dl_type=2048, nw_src='10.22.3.4/32', priority=73,
reg_net=self.vlan_tag, table=82),
reg_net=self.vlan_tag, table=82,
flow_group_id='ofport1'),
mock.call(actions='conjunction(23,1/2)', ct_state='+new-est',
dl_type=2048, nw_src='10.22.3.4/32', priority=73,
reg_net=self.vlan_tag, table=82)])
reg_net=self.vlan_tag, table=82,
flow_group_id='ofport1')])
def _sg_removed(self, sg_name):
with mock.patch.object(self.manager.conj_id_map,

View File

@ -0,0 +1,11 @@
---
features:
- |
Added a new configuration variable, in ``[OVS]`` section, to control
the OVS OpenFlow rule processing operations when using the OVS native
firewall driver (``securitygroup.firewall_driver=openvswitch``):
* ``openflow_processed_per_port``: by default "False". If enabled, all
OpenFlow rules associated to a port will be processed at once, in a
single transaction. If disabled, the flows will be processed in batches
of "AGENT_RES_PROCESSING_STEP=100" number of OpenFlow rules.