Allow to process FW OF rules belonging to a port in a single operation
This patch adds a new configuration variable to control the OVS OpenFlow rule processing operations: * ``openflow_processed_per_port``: by default "False". If enabled, all OpenFlow rules associated to a port will be processed at once, in one single transaction. If disabled, the flows will be processed in batches of "AGENT_RES_PROCESSING_STEP=100" number of OpenFlow rules. With ``openflow_processed_per_port`` enabled, all Firewall OpenFlow rules related to a port are processed in one transaction (executed in one single command). That ensures the rules are written atomically and apply all of them at the same time. That means all needed rules to handle the ingress and egress traffic of a port using the Open vSwitch Firewall, are committed in the OVS DB at the same time. That will prevent from partially applied OpenFlow sets in the Firewall and inconsistencies when applying new SG rules or during the OVS agent restart. That will override, if needed, the hard limit of "AGENT_RES_PROCESSING_STEP=100" OpenFlow rules that could be processed in OVS at once. If the default configuration values are not modified, the behaviour of the OVS library does not change. Closes-Bug: #1934917 Change-Id: If4984dece266a789d607725f8497f1aac3d73d23
This commit is contained in:
parent
7c97ed50d0
commit
ab84b7fb2b
@ -16,7 +16,6 @@
|
||||
import collections
|
||||
import functools
|
||||
import itertools
|
||||
import operator
|
||||
import random
|
||||
import time
|
||||
import uuid
|
||||
@ -78,6 +77,9 @@ CTRL_BURST_LIMIT_MIN = 25
|
||||
# TODO(slaweq): move this to neutron_lib.constants
|
||||
TYPE_GRE_IP6 = 'ip6gre'
|
||||
|
||||
ActionFlowTuple = collections.namedtuple('ActionFlowTuple',
|
||||
['action', 'flow', 'flow_group_id'])
|
||||
|
||||
|
||||
def _ovsdb_result_pending(result):
|
||||
"""Return True if ovsdb indicates the result is still pending."""
|
||||
@ -251,6 +253,7 @@ class OVSBridge(BaseOVS):
|
||||
self.initial_protocols = {
|
||||
constants.OPENFLOW10, constants.OPENFLOW13, constants.OPENFLOW14}
|
||||
self.initial_protocols.add(self._highest_protocol_needed)
|
||||
self._flows_per_port = cfg.CONF.OVS.openflow_processed_per_port
|
||||
|
||||
@property
|
||||
def default_cookie(self):
|
||||
@ -468,7 +471,22 @@ class OVSBridge(BaseOVS):
|
||||
self.br_name)
|
||||
raise RuntimeError(_('No datapath_id on bridge %s') % self.br_name)
|
||||
|
||||
def do_action_flows(self, action, kwargs_list, use_bundle=False):
|
||||
def do_action_flows_by_group_id(self, action, flows_by_group_id,
|
||||
use_bundle=False):
|
||||
if self._flows_per_port:
|
||||
# Group flow actions per port.
|
||||
for flow_group_id, flows in flows_by_group_id.items():
|
||||
self.do_action_flows(action, flows, use_bundle=use_bundle,
|
||||
flow_group_id=flow_group_id)
|
||||
else:
|
||||
# Group all actions in one single list without any group ID
|
||||
# reference.
|
||||
flows = [item for _list in flows_by_group_id.values()
|
||||
for item in _list]
|
||||
self.do_action_flows(action, flows, use_bundle=use_bundle)
|
||||
|
||||
def do_action_flows(self, action, kwargs_list, use_bundle=False,
|
||||
flow_group_id=None):
|
||||
# we can't mix strict and non-strict, so we'll use the first kw
|
||||
# and check against other kw being different
|
||||
strict = kwargs_list[0].get('strict', False)
|
||||
@ -515,7 +533,15 @@ class OVSBridge(BaseOVS):
|
||||
if use_bundle:
|
||||
extra_param.append('--bundle')
|
||||
|
||||
step = common_constants.AGENT_RES_PROCESSING_STEP
|
||||
if flow_group_id:
|
||||
# NOTE(ralonsoh): all flows belonging to a port will be written
|
||||
# atomically in the same command.
|
||||
step = len(flow_strs)
|
||||
else:
|
||||
# No group ID defined (flows are not grouped per port). Use the
|
||||
# default batch step value "openflow_number_processing_step".
|
||||
step = common_constants.AGENT_RES_PROCESSING_STEP
|
||||
|
||||
for i in range(0, len(flow_strs), step):
|
||||
self.run_ofctl('%s-flows' % action, extra_param + ['-'],
|
||||
'\n'.join(flow_strs[i:i + step]))
|
||||
@ -1243,14 +1269,15 @@ class DeferredOVSBridge(object):
|
||||
return getattr(self.br, name)
|
||||
raise AttributeError(name)
|
||||
|
||||
def add_flow(self, **kwargs):
|
||||
self.action_flow_tuples.append(('add', kwargs))
|
||||
def add_flow(self, flow_group_id=None, **kwargs):
|
||||
self.action_flow_tuples.append(
|
||||
ActionFlowTuple('add', kwargs, flow_group_id))
|
||||
|
||||
def mod_flow(self, **kwargs):
|
||||
self.action_flow_tuples.append(('mod', kwargs))
|
||||
self.action_flow_tuples.append(ActionFlowTuple('mod', kwargs, None))
|
||||
|
||||
def delete_flows(self, **kwargs):
|
||||
self.action_flow_tuples.append(('del', kwargs))
|
||||
self.action_flow_tuples.append(ActionFlowTuple('del', kwargs, None))
|
||||
|
||||
def apply_flows(self):
|
||||
action_flow_tuples = self.action_flow_tuples
|
||||
@ -1259,14 +1286,16 @@ class DeferredOVSBridge(object):
|
||||
return
|
||||
|
||||
if not self.full_ordered:
|
||||
action_flow_tuples.sort(key=lambda af: self.weights[af[0]])
|
||||
action_flow_tuples.sort(key=lambda flow: self.weights[flow.action])
|
||||
|
||||
grouped = itertools.groupby(action_flow_tuples,
|
||||
key=operator.itemgetter(0))
|
||||
itemgetter_1 = operator.itemgetter(1)
|
||||
for action, action_flow_list in grouped:
|
||||
flows = list(map(itemgetter_1, action_flow_list))
|
||||
self.br.do_action_flows(action, flows, self.use_bundle)
|
||||
flows_by_action = itertools.groupby(action_flow_tuples,
|
||||
key=lambda af: af.action)
|
||||
for action, flows in flows_by_action:
|
||||
flows_by_group_id = collections.defaultdict(list)
|
||||
for flow in flows:
|
||||
flows_by_group_id[flow.flow_group_id].append(flow.flow)
|
||||
self.br.do_action_flows_by_group_id(action, flows_by_group_id,
|
||||
self.use_bundle)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
@ -406,7 +406,7 @@ class ConjIPFlowManager(object):
|
||||
|
||||
def _update_flows_for_vlan_subr(self, direction, ethertype, vlan_tag,
|
||||
flow_state, addr_to_conj,
|
||||
conj_id_to_remove):
|
||||
conj_id_to_remove, ofport):
|
||||
"""Do the actual flow updates for given direction and ethertype."""
|
||||
conj_id_to_remove = conj_id_to_remove or []
|
||||
# Delete any current flow related to any deleted IP address, before
|
||||
@ -450,9 +450,9 @@ class ConjIPFlowManager(object):
|
||||
continue
|
||||
for flow in rules.create_flows_for_ip_address(
|
||||
addr, direction, ethertype, vlan_tag, conj_ids):
|
||||
self.driver._add_flow(**flow)
|
||||
self.driver._add_flow(flow_group_id=ofport, **flow)
|
||||
|
||||
def update_flows_for_vlan(self, vlan_tag, conj_id_to_remove=None):
|
||||
def update_flows_for_vlan(self, vlan_tag, ofport, conj_id_to_remove=None):
|
||||
"""Install action=conjunction(conj_id, 1/2) flows,
|
||||
which depend on IP addresses of remote_group_id or
|
||||
remote_address_group_id.
|
||||
@ -466,7 +466,7 @@ class ConjIPFlowManager(object):
|
||||
self._update_flows_for_vlan_subr(
|
||||
direction, ethertype, vlan_tag,
|
||||
self.flow_state[vlan_tag][(direction, ethertype)],
|
||||
addr_to_conj, conj_id_to_remove)
|
||||
addr_to_conj, conj_id_to_remove, ofport)
|
||||
self.flow_state[vlan_tag][(direction, ethertype)] = addr_to_conj
|
||||
|
||||
def add(self, vlan_tag, sg_id, remote_id, direction, ethertype,
|
||||
@ -522,7 +522,7 @@ class ConjIPFlowManager(object):
|
||||
update = True
|
||||
|
||||
if update:
|
||||
self.update_flows_for_vlan(vlan_tag,
|
||||
self.update_flows_for_vlan(vlan_tag, None,
|
||||
conj_id_to_remove=conj_id_to_remove)
|
||||
|
||||
|
||||
@ -595,7 +595,18 @@ class OVSFirewallDriver(firewall.FirewallDriver):
|
||||
for f in rules.create_accept_flows(flow):
|
||||
self._add_flow(**f)
|
||||
|
||||
def _add_flow(self, **kwargs):
|
||||
def _add_flow(self, flow_group_id=None, **kwargs):
|
||||
"""Add a new flow.
|
||||
|
||||
Most of the port related flows will have the parameters "reg_port" or
|
||||
"in_port". If no "flow_group_id" is defined, "in_port" or "reg_port"
|
||||
will be used instead (those parameters store the port "ofport"). The
|
||||
flow group ID will be used to commit all flows related to a port in
|
||||
the same transaction (for deferred OVS bridge implementation only).
|
||||
"""
|
||||
flow_group_id = (flow_group_id or
|
||||
kwargs.get('in_port') or
|
||||
kwargs.get('reg_port'))
|
||||
dl_type = kwargs.get('dl_type')
|
||||
create_reg_numbers(kwargs)
|
||||
if isinstance(dl_type, int):
|
||||
@ -603,7 +614,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
|
||||
if self._update_cookie:
|
||||
kwargs['cookie'] = self._update_cookie
|
||||
if self._deferred:
|
||||
self.int_br.add_flow(**kwargs)
|
||||
self.int_br.add_flow(flow_group_id=flow_group_id, **kwargs)
|
||||
else:
|
||||
self.int_br.br.add_flow(**kwargs)
|
||||
|
||||
@ -889,6 +900,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
|
||||
actions += 'strip_vlan,resubmit(,{:d})'.format(
|
||||
ovs_consts.BASE_INGRESS_TABLE)
|
||||
self._add_flow(
|
||||
flow_group_id=ofport,
|
||||
table=ovs_consts.TRANSIENT_TABLE,
|
||||
priority=90,
|
||||
dl_dst=mac,
|
||||
@ -900,6 +912,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
|
||||
actions += 'resubmit(,{:d})'.format(
|
||||
ovs_consts.BASE_INGRESS_TABLE)
|
||||
self._add_flow(
|
||||
flow_group_id=ofport,
|
||||
table=ovs_consts.TRANSIENT_TABLE,
|
||||
priority=90,
|
||||
dl_dst=mac,
|
||||
@ -944,6 +957,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
|
||||
port.vlan_tag, port.network_type)
|
||||
|
||||
self._add_flow(
|
||||
flow_group_id=port.ofport,
|
||||
table=ovs_consts.TRANSIENT_TABLE,
|
||||
priority=90,
|
||||
dl_dst=mac_addr,
|
||||
@ -1198,6 +1212,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
|
||||
# and if not, accept it
|
||||
for mac_addr in port.all_allowed_macs:
|
||||
self._add_flow(
|
||||
flow_group_id=port.ofport,
|
||||
table=ovs_consts.ACCEPT_OR_INGRESS_TABLE,
|
||||
priority=100,
|
||||
dl_dst=mac_addr,
|
||||
@ -1240,6 +1255,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
|
||||
|
||||
# Prevent flood for accepted egress traffic
|
||||
self._add_flow(
|
||||
flow_group_id=dst_port,
|
||||
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE,
|
||||
priority=12,
|
||||
dl_dst=mac,
|
||||
@ -1272,6 +1288,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
|
||||
|
||||
if patch_ofport is not ovs_lib.INVALID_OFPORT:
|
||||
self._add_flow(
|
||||
flow_group_id=dst_port,
|
||||
table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE,
|
||||
priority=10,
|
||||
dl_src=mac,
|
||||
@ -1296,6 +1313,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
|
||||
def _initialize_tracked_egress(self, port):
|
||||
# Drop invalid packets
|
||||
self._add_flow(
|
||||
flow_group_id=port.ofport,
|
||||
table=ovs_consts.RULES_EGRESS_TABLE,
|
||||
priority=50,
|
||||
ct_state=ovsfw_consts.OF_STATE_INVALID,
|
||||
@ -1411,6 +1429,7 @@ class OVSFirewallDriver(firewall.FirewallDriver):
|
||||
def _initialize_tracked_ingress(self, port):
|
||||
# Drop invalid packets
|
||||
self._add_flow(
|
||||
flow_group_id=port.ofport,
|
||||
table=ovs_consts.RULES_INGRESS_TABLE,
|
||||
priority=50,
|
||||
ct_state=ovsfw_consts.OF_STATE_INVALID,
|
||||
@ -1557,7 +1576,8 @@ class OVSFirewallDriver(firewall.FirewallDriver):
|
||||
|
||||
self._add_non_ip_conj_flows(port)
|
||||
|
||||
self.conj_ip_manager.update_flows_for_vlan(port.vlan_tag)
|
||||
self.conj_ip_manager.update_flows_for_vlan(port.vlan_tag,
|
||||
port.ofport)
|
||||
|
||||
def _create_rules_generator_for_port(self, port):
|
||||
for sec_group in port.sec_groups:
|
||||
|
@ -44,6 +44,15 @@ OPTS = [
|
||||
'unregistered multicast packets to all ports. '
|
||||
'The switch will send unregistered multicast packets '
|
||||
'only to ports connected to multicast routers.')),
|
||||
cfg.BoolOpt('openflow_processed_per_port',
|
||||
default=False,
|
||||
help=_('If enabled, all OpenFlow rules associated to a port '
|
||||
'are processed at once, in one single transaction. '
|
||||
'That avoids possible inconsistencies during OVS agent '
|
||||
'restart and port updates. '
|
||||
'If disabled, the flows will be processed in batches '
|
||||
'of "openflow_number_processing_step" number of '
|
||||
'OpenFlow rules.')),
|
||||
]
|
||||
|
||||
|
||||
|
@ -612,8 +612,8 @@ class TestDeferredOVSBridge(base.BaseTestCase):
|
||||
super(TestDeferredOVSBridge, self).setUp()
|
||||
|
||||
self.br = mock.Mock()
|
||||
self.mocked_do_action_flows = mock.patch.object(
|
||||
self.br, 'do_action_flows').start()
|
||||
self.mock_do_action_flows_by_group_id = mock.patch.object(
|
||||
self.br, 'do_action_flows_by_group_id').start()
|
||||
|
||||
self.add_flow_dict1 = dict(in_port=11, actions='drop')
|
||||
self.add_flow_dict2 = dict(in_port=12, actions='drop')
|
||||
@ -628,15 +628,15 @@ class TestDeferredOVSBridge(base.BaseTestCase):
|
||||
ovs_lib.DeferredOVSBridge.ALLOWED_PASSTHROUGHS)
|
||||
|
||||
def _verify_mock_call(self, expected_calls):
|
||||
self.mocked_do_action_flows.assert_has_calls(expected_calls)
|
||||
self.mock_do_action_flows_by_group_id.assert_has_calls(expected_calls)
|
||||
self.assertEqual(len(expected_calls),
|
||||
len(self.mocked_do_action_flows.mock_calls))
|
||||
len(self.mock_do_action_flows_by_group_id.mock_calls))
|
||||
|
||||
def test_apply_on_exit(self):
|
||||
expected_calls = [
|
||||
mock.call('add', [self.add_flow_dict1], False),
|
||||
mock.call('mod', [self.mod_flow_dict1], False),
|
||||
mock.call('del', [self.del_flow_dict1], False),
|
||||
mock.call('add', {None: [self.add_flow_dict1]}, False),
|
||||
mock.call('mod', {None: [self.mod_flow_dict1]}, False),
|
||||
mock.call('del', {None: [self.del_flow_dict1]}, False),
|
||||
]
|
||||
|
||||
with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
|
||||
@ -660,13 +660,16 @@ class TestDeferredOVSBridge(base.BaseTestCase):
|
||||
|
||||
def test_apply(self):
|
||||
expected_calls = [
|
||||
mock.call('add', [self.add_flow_dict1], False),
|
||||
mock.call('mod', [self.mod_flow_dict1], False),
|
||||
mock.call('del', [self.del_flow_dict1], False),
|
||||
mock.call('add',
|
||||
{11: [self.add_flow_dict1], 12: [self.add_flow_dict2]},
|
||||
False),
|
||||
mock.call('mod', {None: [self.mod_flow_dict1]}, False),
|
||||
mock.call('del', {None: [self.del_flow_dict1]}, False),
|
||||
]
|
||||
|
||||
with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
|
||||
deferred_br.add_flow(**self.add_flow_dict1)
|
||||
deferred_br.add_flow(flow_group_id=11, **self.add_flow_dict1)
|
||||
deferred_br.add_flow(flow_group_id=12, **self.add_flow_dict2)
|
||||
deferred_br.mod_flow(**self.mod_flow_dict1)
|
||||
deferred_br.delete_flows(**self.del_flow_dict1)
|
||||
self._verify_mock_call([])
|
||||
@ -676,12 +679,15 @@ class TestDeferredOVSBridge(base.BaseTestCase):
|
||||
|
||||
def test_apply_order(self):
|
||||
expected_calls = [
|
||||
mock.call(
|
||||
'del', [self.del_flow_dict1, self.del_flow_dict2], False),
|
||||
mock.call(
|
||||
'mod', [self.mod_flow_dict1, self.mod_flow_dict2], False),
|
||||
mock.call(
|
||||
'add', [self.add_flow_dict1, self.add_flow_dict2], False),
|
||||
mock.call('del',
|
||||
{None: [self.del_flow_dict1, self.del_flow_dict2]},
|
||||
False),
|
||||
mock.call('mod',
|
||||
{None: [self.mod_flow_dict1, self.mod_flow_dict2]},
|
||||
False),
|
||||
mock.call('add',
|
||||
{None: [self.add_flow_dict1, self.add_flow_dict2]},
|
||||
False),
|
||||
]
|
||||
|
||||
order = 'del', 'mod', 'add'
|
||||
@ -696,12 +702,13 @@ class TestDeferredOVSBridge(base.BaseTestCase):
|
||||
|
||||
def test_apply_full_ordered(self):
|
||||
expected_calls = [
|
||||
mock.call('add', [self.add_flow_dict1], False),
|
||||
mock.call('mod', [self.mod_flow_dict1], False),
|
||||
mock.call(
|
||||
'del', [self.del_flow_dict1, self.del_flow_dict2], False),
|
||||
mock.call('add', [self.add_flow_dict2], False),
|
||||
mock.call('mod', [self.mod_flow_dict2], False),
|
||||
mock.call('add', {None: [self.add_flow_dict1]}, False),
|
||||
mock.call('mod', {None: [self.mod_flow_dict1]}, False),
|
||||
mock.call('del',
|
||||
{None: [self.del_flow_dict1, self.del_flow_dict2]},
|
||||
False),
|
||||
mock.call('add', {None: [self.add_flow_dict2]}, False),
|
||||
mock.call('mod', {None: [self.mod_flow_dict2]}, False),
|
||||
]
|
||||
|
||||
with ovs_lib.DeferredOVSBridge(self.br,
|
||||
|
@ -423,7 +423,7 @@ class TestConjIPFlowManager(base.BaseTestCase):
|
||||
get_conj_id_mock.return_value = self.conj_id
|
||||
self.manager.add(self.vlan_tag, 'sg', 'remote_id',
|
||||
constants.INGRESS_DIRECTION, constants.IPv4, 0)
|
||||
self.manager.update_flows_for_vlan(self.vlan_tag)
|
||||
self.manager.update_flows_for_vlan(self.vlan_tag, mock.ANY)
|
||||
self.assertFalse(remote_group.get_ethertype_filtered_addresses.called)
|
||||
self.assertFalse(self.driver._add_flow.called)
|
||||
|
||||
@ -439,7 +439,7 @@ class TestConjIPFlowManager(base.BaseTestCase):
|
||||
get_conj_id_mock.return_value = self.conj_id
|
||||
self.manager.add(self.vlan_tag, 'sg', 'remote_id',
|
||||
constants.INGRESS_DIRECTION, constants.IPv4, 0)
|
||||
self.manager.update_flows_for_vlan(self.vlan_tag)
|
||||
self.manager.update_flows_for_vlan(self.vlan_tag, mock.ANY)
|
||||
self.assertTrue(remote_group.get_ethertype_filtered_addresses.called)
|
||||
self.assertTrue(self.driver._add_flow.called)
|
||||
|
||||
@ -454,20 +454,24 @@ class TestConjIPFlowManager(base.BaseTestCase):
|
||||
constants.INGRESS_DIRECTION, constants.IPv4, 0)
|
||||
self.manager.add(self.vlan_tag, 'sg', 'remote_id',
|
||||
constants.INGRESS_DIRECTION, constants.IPv4, 3)
|
||||
self.manager.update_flows_for_vlan(self.vlan_tag)
|
||||
self.manager.update_flows_for_vlan(self.vlan_tag, 'ofport1')
|
||||
self.assertEqual(self.driver._add_flow.call_args_list,
|
||||
[mock.call(actions='conjunction(16,1/2)', ct_state='+est-rel-rpl',
|
||||
dl_type=2048, nw_src='10.22.3.4/32', priority=70,
|
||||
reg_net=self.vlan_tag, table=82),
|
||||
reg_net=self.vlan_tag, table=82,
|
||||
flow_group_id='ofport1'),
|
||||
mock.call(actions='conjunction(17,1/2)', ct_state='+new-est',
|
||||
dl_type=2048, nw_src='10.22.3.4/32', priority=70,
|
||||
reg_net=self.vlan_tag, table=82),
|
||||
reg_net=self.vlan_tag, table=82,
|
||||
flow_group_id='ofport1'),
|
||||
mock.call(actions='conjunction(22,1/2)', ct_state='+est-rel-rpl',
|
||||
dl_type=2048, nw_src='10.22.3.4/32', priority=73,
|
||||
reg_net=self.vlan_tag, table=82),
|
||||
reg_net=self.vlan_tag, table=82,
|
||||
flow_group_id='ofport1'),
|
||||
mock.call(actions='conjunction(23,1/2)', ct_state='+new-est',
|
||||
dl_type=2048, nw_src='10.22.3.4/32', priority=73,
|
||||
reg_net=self.vlan_tag, table=82)])
|
||||
reg_net=self.vlan_tag, table=82,
|
||||
flow_group_id='ofport1')])
|
||||
|
||||
def _sg_removed(self, sg_name):
|
||||
with mock.patch.object(self.manager.conj_id_map,
|
||||
|
@ -0,0 +1,11 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Added a new configuration variable, in ``[OVS]`` section, to control
|
||||
the OVS OpenFlow rule processing operations when using the OVS native
|
||||
firewall driver (``securitygroup.firewall_driver=openvswitch``):
|
||||
|
||||
* ``openflow_processed_per_port``: by default "False". If enabled, all
|
||||
OpenFlow rules associated to a port will be processed at once, in a
|
||||
single transaction. If disabled, the flows will be processed in batches
|
||||
of "AGENT_RES_PROCESSING_STEP=100" number of OpenFlow rules.
|
Loading…
Reference in New Issue
Block a user