Browse Source

Set trusted port only once in iptables firewall driver

Patch [1] added configuration of forward rule for trusted ports in
iptables firewall driver.

This patch fixes issue with many "duplicate iptables rule detected"
warning messages due to try to add such forward rule each time when
trusted port is updated.
Now such rule is added only once for port.

[1] https://review.openstack.org/#/c/525607/

Change-Id: Ib816887f07f16b6ac865bb81d0f27f12d0b47dfb
Closes-Bug: #1754770
changes/81/551981/3
Sławek Kapłoński 4 years ago
parent
commit
8be0c2a551
  1. 9
      neutron/agent/linux/iptables_firewall.py
  2. 62
      neutron/tests/unit/agent/linux/test_iptables_firewall.py

9
neutron/agent/linux/iptables_firewall.py

@ -67,6 +67,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
# list of port which has security group
self.filtered_ports = {}
self.unfiltered_ports = {}
self.trusted_ports = []
self.ipconntrack = ip_conntrack.get_conntrack(
self.iptables.get_rules_for_table, self.filtered_ports,
self.unfiltered_ports, namespace=namespace,
@ -111,11 +112,15 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
def process_trusted_ports(self, port_ids):
"""Process ports that are trusted and shouldn't be filtered."""
for port in port_ids:
self._add_trusted_port_rules(port)
if port not in self.trusted_ports:
self._add_trusted_port_rules(port)
self.trusted_ports.append(port)
def remove_trusted_ports(self, port_ids):
for port in port_ids:
self._remove_trusted_port_rules(port)
if port in self.trusted_ports:
self._remove_trusted_port_rules(port)
self.trusted_ports.remove(port)
def _add_trusted_port_rules(self, port):
device = self._get_device_name(port)

62
neutron/tests/unit/agent/linux/test_iptables_firewall.py

@ -1080,6 +1080,68 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def _test_process_trusted_ports(self, configured):
port = self._fake_port()
port['id'] = 'tapfake_dev'
calls = [
mock.call.add_chain('sg-fallback'),
mock.call.add_rule('sg-fallback',
'-j DROP', comment=ic.UNMATCH_DROP)]
if configured:
self.firewall.trusted_ports.append(port['id'])
else:
calls.append(
mock.call.add_rule('FORWARD',
'-m physdev --physdev-out tapfake_dev '
'--physdev-is-bridged '
'-j ACCEPT', comment=ic.TRUSTED_ACCEPT))
filter_inst = self.v4filter_inst
self.firewall.process_trusted_ports([port['id']])
comb = zip(calls, filter_inst.mock_calls)
for (l, r) in comb:
self.assertEqual(l, r)
filter_inst.assert_has_calls(calls)
self.assertIn(port['id'], self.firewall.trusted_ports)
def test_process_trusted_ports(self):
self._test_process_trusted_ports(False)
def test_process_trusted_ports_already_configured(self):
self._test_process_trusted_ports(True)
def _test_remove_trusted_ports(self, configured):
port = self._fake_port()
port['id'] = 'tapfake_dev'
calls = [
mock.call.add_chain('sg-fallback'),
mock.call.add_rule('sg-fallback',
'-j DROP', comment=ic.UNMATCH_DROP)]
if configured:
self.firewall.trusted_ports.append(port['id'])
calls.append(
mock.call.remove_rule('FORWARD',
'-m physdev --physdev-out tapfake_dev '
'--physdev-is-bridged -j ACCEPT'))
filter_inst = self.v4filter_inst
self.firewall.remove_trusted_ports([port['id']])
comb = zip(calls, filter_inst.mock_calls)
for (l, r) in comb:
self.assertEqual(l, r)
filter_inst.assert_has_calls(calls)
self.assertNotIn(port['id'], self.firewall.trusted_ports)
def test_remove_trusted_ports(self):
self._test_remove_trusted_ports(True)
def test_process_remove_ports_not_configured(self):
self._test_remove_trusted_ports(False)
def _test_prepare_port_filter(self,
rule,
ingress_expected_call=None,

Loading…
Cancel
Save