neutron/neutron/tests/unit/agent/linux/test_iptables_manager.py
Hong Hui Xiao 24f95f4877 Move address scope specific code out of iptables_manager
iptables_manager will be used by many features including security
groups, FWaaS, metering. The address scope specific code should be
moved out of iptables_manager, so that other feature will not get
the iptables rules that they will not use. For example, dhcp namespace
will not have the address scope iptables rules.

The change to the test code to adapt the change at [1], has also been
reverted in this patch. Instead, a couple of new test cases are added.

[1] https://review.openstack.org/#/c/270001/

Change-Id: Ifc8e7a381f8ab005a9e0216532cc7d0e7378c025
Closes-Bug: #1549513
2016-05-06 08:15:00 +00:00

1102 lines
44 KiB
Python

# Copyright 2012 Locaweb.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import mock
from oslo_config import cfg
import testtools
from neutron._i18n import _
from neutron.agent.linux import iptables_comments as ic
from neutron.agent.linux import iptables_manager
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.tests import base
from neutron.tests import tools
IPTABLES_ARG = {'bn': iptables_manager.binary_name,
'snat_out_comment': ic.SNAT_OUT,
'filter_rules': '',
'mark': constants.ROUTER_MARK_MASK}
NAT_TEMPLATE = ('# Generated by iptables_manager\n'
'*nat\n'
':OUTPUT - [0:0]\n'
':POSTROUTING - [0:0]\n'
':PREROUTING - [0:0]\n'
':neutron-postrouting-bottom - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-POSTROUTING - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
':%(bn)s-float-snat - [0:0]\n'
':%(bn)s-snat - [0:0]\n'
'-I OUTPUT 1 -j %(bn)s-OUTPUT\n'
'-I POSTROUTING 1 -j %(bn)s-POSTROUTING\n'
'-I POSTROUTING 2 -j neutron-postrouting-bottom\n'
'-I PREROUTING 1 -j %(bn)s-PREROUTING\n'
'-I neutron-postrouting-bottom 1 -j %(bn)s-snat\n'
'-I %(bn)s-snat 1 -j '
'%(bn)s-float-snat\n'
'COMMIT\n'
'# Completed by iptables_manager\n')
NAT_DUMP = NAT_TEMPLATE % IPTABLES_ARG
FILTER_TEMPLATE = ('# Generated by iptables_manager\n'
'*filter\n'
':FORWARD - [0:0]\n'
':INPUT - [0:0]\n'
':OUTPUT - [0:0]\n'
':neutron-filter-top - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'-I FORWARD 1 -j neutron-filter-top\n'
'-I FORWARD 2 -j %(bn)s-FORWARD\n'
'-I INPUT 1 -j %(bn)s-INPUT\n'
'-I OUTPUT 1 -j neutron-filter-top\n'
'-I OUTPUT 2 -j %(bn)s-OUTPUT\n'
'-I neutron-filter-top 1 -j %(bn)s-local\n'
'COMMIT\n'
'# Completed by iptables_manager\n')
FILTER_DUMP = FILTER_TEMPLATE % IPTABLES_ARG
FILTER_WITH_RULES_TEMPLATE = (
'# Generated by iptables_manager\n'
'*filter\n'
':FORWARD - [0:0]\n'
':INPUT - [0:0]\n'
':OUTPUT - [0:0]\n'
':neutron-filter-top - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-filter - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'-I FORWARD 1 -j neutron-filter-top\n'
'-I FORWARD 2 -j %(bn)s-FORWARD\n'
'-I INPUT 1 -j %(bn)s-INPUT\n'
'-I OUTPUT 1 -j neutron-filter-top\n'
'-I OUTPUT 2 -j %(bn)s-OUTPUT\n'
'-I neutron-filter-top 1 -j %(bn)s-local\n'
'%(filter_rules)s'
'COMMIT\n'
'# Completed by iptables_manager\n')
COMMENTED_NAT_DUMP = (
'# Generated by iptables_manager\n'
'*nat\n'
':OUTPUT - [0:0]\n'
':POSTROUTING - [0:0]\n'
':PREROUTING - [0:0]\n'
':neutron-postrouting-bottom - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-POSTROUTING - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
':%(bn)s-float-snat - [0:0]\n'
':%(bn)s-snat - [0:0]\n'
'-I OUTPUT 1 -j %(bn)s-OUTPUT\n'
'-I POSTROUTING 1 -j %(bn)s-POSTROUTING\n'
'-I POSTROUTING 2 -j neutron-postrouting-bottom\n'
'-I PREROUTING 1 -j %(bn)s-PREROUTING\n'
'-I neutron-postrouting-bottom 1 '
'-m comment --comment "%(snat_out_comment)s" -j %(bn)s-snat\n'
'-I %(bn)s-snat 1 -j '
'%(bn)s-float-snat\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % IPTABLES_ARG)
TRAFFIC_COUNTERS_DUMP = (
'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
' pkts bytes target prot opt in out source'
' destination \n'
' 400 65901 chain1 all -- * * 0.0.0.0/0'
' 0.0.0.0/0 \n'
' 400 65901 chain2 all -- * * 0.0.0.0/0'
' 0.0.0.0/0 \n')
class IptablesTestCase(base.BaseTestCase):
def test_get_binary_name_in_unittest(self):
# Corresponds to sys.argv content when running python -m unittest class
with mock.patch('sys.argv', ['python -m unittest', 'class']):
binary_name = iptables_manager.get_binary_name()
self.assertEqual('python_-m_unitte', binary_name)
class IptablesCommentsTestCase(base.BaseTestCase):
def setUp(self):
super(IptablesCommentsTestCase, self).setUp()
cfg.CONF.set_override('comment_iptables_rules', True, 'AGENT')
self.iptables = iptables_manager.IptablesManager()
self.execute = mock.patch.object(self.iptables, "execute").start()
def test_comments_short_enough(self):
for attr in dir(ic):
if not attr.startswith('__') and len(getattr(ic, attr)) > 255:
self.fail("Iptables comment %s is longer than 255 characters."
% attr)
def test_reordering_of_jump_rule_comments(self):
# jump at the start
self.assertEqual(
'-m comment --comment "aloha" -j sg-chain',
iptables_manager.comment_rule('-j sg-chain', 'aloha'))
# jump in the middle
self.assertEqual(
'-s source -m comment --comment "aloha" -j sg-chain',
iptables_manager.comment_rule('-s source -j sg-chain', 'aloha'))
# no jump rule
self.assertEqual(
'-s source -m comment --comment "aloha"',
iptables_manager.comment_rule('-s source', 'aloha'))
def test_add_filter_rule(self):
iptables_args = {}
iptables_args.update(IPTABLES_ARG)
filter_rules = ('-I %(bn)s-INPUT 1 -s 0/0 -d 192.168.0.2 -j '
'%(bn)s-filter\n-I %(bn)s-filter 1 -j DROP\n'
% iptables_args)
iptables_args['filter_rules'] = filter_rules
filter_dump_mod = FILTER_WITH_RULES_TEMPLATE % iptables_args
raw_dump = _generate_raw_dump(IPTABLES_ARG)
mangle_dump = _generate_mangle_dump(IPTABLES_ARG)
expected_calls_and_values = [
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(filter_dump_mod + mangle_dump +
COMMENTED_NAT_DUMP + raw_dump),
run_as_root=True),
None),
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(FILTER_DUMP + mangle_dump +
COMMENTED_NAT_DUMP + raw_dump),
run_as_root=True
),
None),
]
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
self.iptables.ipv4['filter'].add_rule('filter', '-j DROP')
self.iptables.ipv4['filter'].add_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' %(bn)s-filter' % IPTABLES_ARG)
self.iptables.apply()
self.iptables.ipv4['filter'].remove_rule('filter', '-j DROP')
self.iptables.ipv4['filter'].remove_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' %(bn)s-filter'
% IPTABLES_ARG)
self.iptables.ipv4['filter'].remove_chain('filter')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def _generate_mangle_dump(iptables_args):
return ('# Generated by iptables_manager\n'
'*mangle\n'
':FORWARD - [0:0]\n'
':INPUT - [0:0]\n'
':OUTPUT - [0:0]\n'
':POSTROUTING - [0:0]\n'
':PREROUTING - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-POSTROUTING - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
':%(bn)s-mark - [0:0]\n'
'-I FORWARD 1 -j %(bn)s-FORWARD\n'
'-I INPUT 1 -j %(bn)s-INPUT\n'
'-I OUTPUT 1 -j %(bn)s-OUTPUT\n'
'-I POSTROUTING 1 -j %(bn)s-POSTROUTING\n'
'-I PREROUTING 1 -j %(bn)s-PREROUTING\n'
'-I %(bn)s-PREROUTING 1 -j %(bn)s-mark\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % iptables_args)
def _generate_mangle_dump_v6(iptables_args):
return ('# Generated by iptables_manager\n'
'*mangle\n'
':FORWARD - [0:0]\n'
':INPUT - [0:0]\n'
':OUTPUT - [0:0]\n'
':POSTROUTING - [0:0]\n'
':PREROUTING - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-POSTROUTING - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
'-I FORWARD 1 -j %(bn)s-FORWARD\n'
'-I INPUT 1 -j %(bn)s-INPUT\n'
'-I OUTPUT 1 -j %(bn)s-OUTPUT\n'
'-I POSTROUTING 1 -j %(bn)s-POSTROUTING\n'
'-I PREROUTING 1 -j %(bn)s-PREROUTING\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % iptables_args)
def _generate_raw_dump(iptables_args):
return ('# Generated by iptables_manager\n'
'*raw\n'
':OUTPUT - [0:0]\n'
':PREROUTING - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
'-I OUTPUT 1 -j %(bn)s-OUTPUT\n'
'-I PREROUTING 1 -j %(bn)s-PREROUTING\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % iptables_args)
MANGLE_DUMP = _generate_mangle_dump(IPTABLES_ARG)
MANGLE_DUMP_V6 = _generate_mangle_dump_v6(IPTABLES_ARG)
RAW_DUMP = _generate_raw_dump(IPTABLES_ARG)
class IptablesManagerStateFulTestCase(base.BaseTestCase):
def setUp(self):
super(IptablesManagerStateFulTestCase, self).setUp()
cfg.CONF.set_override('comment_iptables_rules', False, 'AGENT')
self.iptables = iptables_manager.IptablesManager()
self.execute = mock.patch.object(self.iptables, "execute").start()
def test_binary_name(self):
expected = os.path.basename(sys.argv[0])[:16]
self.assertEqual(expected, iptables_manager.binary_name)
def test_get_chain_name(self):
name = '0123456789' * 5
# 28 chars is the maximum length of iptables chain name.
self.assertEqual(iptables_manager.get_chain_name(name, wrap=False),
name[:28])
# 11 chars is the maximum length of chain name of iptable_manager
# if binary_name is prepended.
self.assertEqual(iptables_manager.get_chain_name(name, wrap=True),
name[:11])
def test_defer_apply_with_exception(self):
self.iptables._apply = mock.Mock(side_effect=Exception)
with testtools.ExpectedException(n_exc.IpTablesApplyException):
with self.iptables.defer_apply():
pass
def _extend_with_ip6tables_filter(self, expected_calls, filter_dump):
expected_calls.insert(2, (
mock.call(['ip6tables-save'],
run_as_root=True),
''))
expected_calls.insert(3, (
mock.call(['ip6tables-restore', '-n'],
process_input=filter_dump,
run_as_root=True),
None))
expected_calls.extend([
(mock.call(['ip6tables-save'],
run_as_root=True),
''),
(mock.call(['ip6tables-restore', '-n'],
process_input=filter_dump,
run_as_root=True),
None)])
def _test_add_and_remove_chain_custom_binary_name_helper(self, use_ipv6):
bn = ("xbcdef" * 5)
self.iptables = iptables_manager.IptablesManager(
binary_name=bn,
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
iptables_args = {'bn': bn[:16], 'filter_rules': ''}
filter_dump = FILTER_WITH_RULES_TEMPLATE % iptables_args
filter_dump_ipv6 = FILTER_TEMPLATE % iptables_args
filter_dump_mod = filter_dump
nat_dump = NAT_TEMPLATE % iptables_args
raw_dump = _generate_raw_dump(iptables_args)
mangle_dump = _generate_mangle_dump(iptables_args)
expected_calls_and_values = [
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(filter_dump_mod + mangle_dump +
nat_dump + raw_dump),
run_as_root=True),
None),
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(filter_dump + mangle_dump +
nat_dump + raw_dump),
run_as_root=True),
None),
]
if use_ipv6:
mangle_dump_v6 = _generate_mangle_dump_v6(iptables_args)
self._extend_with_ip6tables_filter(
expected_calls_and_values,
filter_dump_ipv6 + mangle_dump_v6 + raw_dump)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
self.iptables.apply()
self.iptables.ipv4['filter'].empty_chain('filter')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_and_remove_chain_custom_binary_name(self):
self._test_add_and_remove_chain_custom_binary_name_helper(False)
def test_add_and_remove_chain_custom_binary_name_with_ipv6(self):
self._test_add_and_remove_chain_custom_binary_name_helper(True)
def _test_empty_chain_custom_binary_name_helper(self, use_ipv6):
bn = ("xbcdef" * 5)[:16]
self.iptables = iptables_manager.IptablesManager(
binary_name=bn,
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
iptables_args = {'bn': bn}
filter_dump = FILTER_TEMPLATE % iptables_args
filter_rules = ('-I %(bn)s-filter 1 -s 0/0 -d 192.168.0.2\n'
% iptables_args)
iptables_args['filter_rules'] = filter_rules
filter_dump_mod = FILTER_WITH_RULES_TEMPLATE % iptables_args
nat_dump = NAT_TEMPLATE % iptables_args
raw_dump = _generate_raw_dump(iptables_args)
mangle_dump = _generate_mangle_dump(iptables_args)
expected_calls_and_values = [
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(filter_dump_mod + mangle_dump +
nat_dump + raw_dump),
run_as_root=True),
None),
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(filter_dump + mangle_dump +
nat_dump + raw_dump),
run_as_root=True),
None),
]
if use_ipv6:
mangle_dump_v6 = _generate_mangle_dump_v6(iptables_args)
self._extend_with_ip6tables_filter(
expected_calls_and_values,
filter_dump + mangle_dump_v6 + raw_dump)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
self.iptables.ipv4['filter'].add_rule('filter',
'-s 0/0 -d 192.168.0.2')
self.iptables.apply()
self.iptables.ipv4['filter'].remove_chain('filter')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_empty_chain_custom_binary_name(self):
self._test_empty_chain_custom_binary_name_helper(False)
def test_empty_chain_custom_binary_name_with_ipv6(self):
self._test_empty_chain_custom_binary_name_helper(True)
def _test_add_and_remove_chain_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
filter_dump_mod = FILTER_WITH_RULES_TEMPLATE % IPTABLES_ARG
expected_calls_and_values = [
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(filter_dump_mod + MANGLE_DUMP +
NAT_DUMP + RAW_DUMP),
run_as_root=True),
None),
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(FILTER_DUMP + MANGLE_DUMP + NAT_DUMP +
RAW_DUMP),
run_as_root=True),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(
expected_calls_and_values,
FILTER_DUMP + MANGLE_DUMP_V6 + RAW_DUMP)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
self.iptables.apply()
self.iptables.ipv4['filter'].remove_chain('filter')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_and_remove_chain(self):
self._test_add_and_remove_chain_helper(False)
def test_add_and_remove_chain_with_ipv6(self):
self._test_add_and_remove_chain_helper(True)
def _test_add_filter_rule_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
iptables_args = {}
iptables_args.update(IPTABLES_ARG)
filter_rules = ('-I %(bn)s-INPUT 1 -s 0/0 -d 192.168.0.2 -j '
'%(bn)s-filter\n-I %(bn)s-filter 1 -j DROP\n'
% iptables_args)
iptables_args['filter_rules'] = filter_rules
filter_dump_mod = FILTER_WITH_RULES_TEMPLATE % iptables_args
raw_dump = RAW_DUMP % IPTABLES_ARG
expected_calls_and_values = [
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(filter_dump_mod + MANGLE_DUMP +
NAT_DUMP + RAW_DUMP),
run_as_root=True),
None),
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(FILTER_DUMP + MANGLE_DUMP + NAT_DUMP +
RAW_DUMP),
run_as_root=True
),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(
expected_calls_and_values,
FILTER_DUMP + MANGLE_DUMP_V6 + raw_dump)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain('filter')
self.iptables.ipv4['filter'].add_rule('filter', '-j DROP')
self.iptables.ipv4['filter'].add_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' %(bn)s-filter' % IPTABLES_ARG)
self.iptables.apply()
self.iptables.ipv4['filter'].remove_rule('filter', '-j DROP')
self.iptables.ipv4['filter'].remove_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' %(bn)s-filter'
% IPTABLES_ARG)
self.iptables.ipv4['filter'].remove_chain('filter')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_filter_rule(self):
self._test_add_filter_rule_helper(False)
def test_add_filter_rule_with_ipv6(self):
self._test_add_filter_rule_helper(True)
def _test_rule_with_wrap_target_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
name = '0123456789' * 5
wrap = "%s-%s" % (iptables_manager.binary_name,
iptables_manager.get_chain_name(name))
iptables_args = {'bn': iptables_manager.binary_name,
'wrap': wrap}
filter_dump_mod = ('# Generated by iptables_manager\n'
'*filter\n'
':FORWARD - [0:0]\n'
':INPUT - [0:0]\n'
':OUTPUT - [0:0]\n'
':neutron-filter-top - [0:0]\n'
':%(wrap)s - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'-I FORWARD 1 -j neutron-filter-top\n'
'-I FORWARD 2 -j %(bn)s-FORWARD\n'
'-I INPUT 1 -j %(bn)s-INPUT\n'
'-I OUTPUT 1 -j neutron-filter-top\n'
'-I OUTPUT 2 -j %(bn)s-OUTPUT\n'
'-I neutron-filter-top 1 -j %(bn)s-local\n'
'-I %(bn)s-INPUT 1 -s 0/0 -d 192.168.0.2 -j '
'%(wrap)s\n'
'COMMIT\n'
'# Completed by iptables_manager\n'
% iptables_args)
raw_dump = RAW_DUMP % IPTABLES_ARG
expected_calls_and_values = [
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(filter_dump_mod + MANGLE_DUMP +
NAT_DUMP + RAW_DUMP),
run_as_root=True),
None),
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(FILTER_DUMP + MANGLE_DUMP +
NAT_DUMP + RAW_DUMP),
run_as_root=True),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(
expected_calls_and_values,
FILTER_DUMP + MANGLE_DUMP_V6 + raw_dump)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain(name)
self.iptables.ipv4['filter'].add_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' $%s' % name)
self.iptables.apply()
self.iptables.ipv4['filter'].remove_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' $%s' % name)
self.iptables.ipv4['filter'].remove_chain(name)
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_rule_with_wrap_target(self):
self._test_rule_with_wrap_target_helper(False)
def test_rule_with_wrap_target_with_ipv6(self):
self._test_rule_with_wrap_target_helper(True)
def _test_add_mangle_rule_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
mangle_dump_mod = (
'# Generated by iptables_manager\n'
'*mangle\n'
':FORWARD - [0:0]\n'
':INPUT - [0:0]\n'
':OUTPUT - [0:0]\n'
':POSTROUTING - [0:0]\n'
':PREROUTING - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-POSTROUTING - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
':%(bn)s-mangle - [0:0]\n'
':%(bn)s-mark - [0:0]\n'
'-I FORWARD 1 -j %(bn)s-FORWARD\n'
'-I INPUT 1 -j %(bn)s-INPUT\n'
'-I OUTPUT 1 -j %(bn)s-OUTPUT\n'
'-I POSTROUTING 1 -j %(bn)s-POSTROUTING\n'
'-I PREROUTING 1 -j %(bn)s-PREROUTING\n'
'-I %(bn)s-PREROUTING 1 -j %(bn)s-mark\n'
'-I %(bn)s-PREROUTING 2 -j MARK --set-xmark 0x1/%(mark)s\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % IPTABLES_ARG)
expected_calls_and_values = [
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(FILTER_DUMP + mangle_dump_mod +
NAT_DUMP + RAW_DUMP),
run_as_root=True),
None),
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(FILTER_DUMP + MANGLE_DUMP +
NAT_DUMP + RAW_DUMP),
run_as_root=True),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(
expected_calls_and_values,
FILTER_DUMP + MANGLE_DUMP_V6 + RAW_DUMP)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['mangle'].add_chain('mangle')
self.iptables.ipv4['mangle'].add_rule(
'PREROUTING',
'-j MARK --set-xmark 0x1/%s' % constants.ROUTER_MARK_MASK)
self.iptables.apply()
self.iptables.ipv4['mangle'].remove_rule(
'PREROUTING',
'-j MARK --set-xmark 0x1/%s' % constants.ROUTER_MARK_MASK)
self.iptables.ipv4['mangle'].remove_chain('mangle')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_mangle_rule(self):
self._test_add_mangle_rule_helper(False)
def test_add_mangle_rule_with_ipv6(self):
self._test_add_mangle_rule_helper(True)
def _test_add_nat_rule_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
nat_dump = NAT_TEMPLATE % IPTABLES_ARG
nat_dump_mod = ('# Generated by iptables_manager\n'
'*nat\n'
':OUTPUT - [0:0]\n'
':POSTROUTING - [0:0]\n'
':PREROUTING - [0:0]\n'
':neutron-postrouting-bottom - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-POSTROUTING - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
':%(bn)s-float-snat - [0:0]\n'
':%(bn)s-nat - [0:0]\n'
':%(bn)s-snat - [0:0]\n'
'-I OUTPUT 1 -j %(bn)s-OUTPUT\n'
'-I POSTROUTING 1 -j %(bn)s-POSTROUTING\n'
'-I POSTROUTING 2 -j neutron-postrouting-bottom\n'
'-I PREROUTING 1 -j %(bn)s-PREROUTING\n'
'-I neutron-postrouting-bottom 1 -j %(bn)s-snat\n'
'-I %(bn)s-PREROUTING 1 -d 192.168.0.3 -j '
'%(bn)s-nat\n'
'-I %(bn)s-nat 1 -p tcp --dport 8080 -j '
'REDIRECT --to-port 80\n'
'-I %(bn)s-snat 1 -j %(bn)s-float-snat\n'
'COMMIT\n'
'# Completed by iptables_manager\n' % IPTABLES_ARG)
raw_dump = RAW_DUMP % IPTABLES_ARG
expected_calls_and_values = [
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(FILTER_DUMP + MANGLE_DUMP +
nat_dump_mod + RAW_DUMP),
run_as_root=True),
None),
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(FILTER_DUMP + MANGLE_DUMP + nat_dump +
RAW_DUMP),
run_as_root=True),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(
expected_calls_and_values,
FILTER_DUMP + MANGLE_DUMP_V6 + raw_dump)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['nat'].add_chain('nat')
self.iptables.ipv4['nat'].add_rule('PREROUTING',
'-d 192.168.0.3 -j '
'%(bn)s-nat' % IPTABLES_ARG)
self.iptables.ipv4['nat'].add_rule('nat',
'-p tcp --dport 8080' +
' -j REDIRECT --to-port 80')
self.iptables.apply()
self.iptables.ipv4['nat'].remove_rule('nat',
'-p tcp --dport 8080 -j'
' REDIRECT --to-port 80')
self.iptables.ipv4['nat'].remove_rule('PREROUTING',
'-d 192.168.0.3 -j '
'%(bn)s-nat' % IPTABLES_ARG)
self.iptables.ipv4['nat'].remove_chain('nat')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_nat_rule(self):
self._test_add_nat_rule_helper(False)
def test_add_nat_rule_with_ipv6(self):
self._test_add_nat_rule_helper(True)
def _test_add_raw_rule_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
raw_dump_mod = ('# Generated by iptables_manager\n'
'*raw\n'
':OUTPUT - [0:0]\n'
':PREROUTING - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-PREROUTING - [0:0]\n'
':%(bn)s-raw - [0:0]\n'
'-I OUTPUT 1 -j %(bn)s-OUTPUT\n'
'-I PREROUTING 1 -j %(bn)s-PREROUTING\n'
'-I %(bn)s-PREROUTING 1 -j CT --notrack\n'
'COMMIT\n'
'# Completed by iptables_manager\n'
% IPTABLES_ARG)
expected_calls_and_values = [
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(FILTER_DUMP + MANGLE_DUMP + NAT_DUMP +
raw_dump_mod),
run_as_root=True),
None),
(mock.call(['iptables-save'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-n'],
process_input=(FILTER_DUMP + MANGLE_DUMP + NAT_DUMP +
RAW_DUMP),
run_as_root=True),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(
expected_calls_and_values,
FILTER_DUMP + MANGLE_DUMP_V6 + RAW_DUMP)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['raw'].add_chain('raw')
self.iptables.ipv4['raw'].add_rule('PREROUTING',
'-j CT --notrack')
self.iptables.apply()
self.iptables.ipv4['raw'].remove_rule('PREROUTING',
'-j CT --notrack')
self.iptables.ipv4['raw'].remove_chain('raw')
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
def test_add_raw_rule(self):
self._test_add_raw_rule_helper(False)
def test_add_raw_rule_with_ipv6(self):
self._test_add_raw_rule_helper(True)
def test_add_rule_to_a_nonexistent_chain(self):
self.assertRaises(LookupError, self.iptables.ipv4['filter'].add_rule,
'nonexistent', '-j DROP')
def test_remove_nonexistent_chain(self):
with mock.patch.object(iptables_manager, "LOG") as log:
self.iptables.ipv4['filter'].remove_chain('nonexistent')
log.debug.assert_called_once_with(
'Attempted to remove chain %s which does not exist',
'nonexistent')
def test_remove_nonexistent_rule(self):
with mock.patch.object(iptables_manager, "LOG") as log:
self.iptables.ipv4['filter'].remove_rule('nonexistent', '-j DROP')
log.warning.assert_called_once_with(
'Tried to remove rule that was not there: '
'%(chain)r %(rule)r %(wrap)r %(top)r',
{'wrap': True, 'top': False, 'rule': '-j DROP',
'chain': 'nonexistent'})
def test_iptables_failure_with_no_failing_line_number(self):
with mock.patch.object(iptables_manager, "LOG") as log:
# generate Runtime errors on iptables-restore calls
def iptables_restore_failer(*args, **kwargs):
if 'iptables-restore' in args[0]:
self.input_lines = kwargs['process_input'].split('\n')
# don't provide a specific failure message so all lines
# are logged
raise RuntimeError()
return FILTER_DUMP
self.execute.side_effect = iptables_restore_failer
# _apply_synchronized calls iptables-restore so it should raise
# a RuntimeError
self.assertRaises(RuntimeError,
self.iptables._apply_synchronized)
# The RuntimeError should have triggered a log of the input to the
# process that it failed to execute. Verify by comparing the log
# call to the 'process_input' arg given to the failed iptables-restore
# call.
# Failure without a specific line number in the error should cause
# all lines to be logged with numbers.
logged = ['%7d. %s' % (n, l)
for n, l in enumerate(self.input_lines, 1)]
log.error.assert_called_once_with(_(
'IPTablesManager.apply failed to apply the '
'following set of iptables rules:\n%s'),
'\n'.join(logged)
)
def test_iptables_failure_on_specific_line(self):
with mock.patch.object(iptables_manager, "LOG") as log:
# generate Runtime errors on iptables-restore calls
def iptables_restore_failer(*args, **kwargs):
if 'iptables-restore' in args[0]:
self.input_lines = kwargs['process_input'].split('\n')
# pretend line 11 failed
msg = ("Exit code: 1\nStdout: ''\n"
"Stderr: 'iptables-restore: line 11 failed\n'")
raise RuntimeError(msg)
return FILTER_DUMP
self.execute.side_effect = iptables_restore_failer
# _apply_synchronized calls iptables-restore so it should raise
# a RuntimeError
self.assertRaises(RuntimeError,
self.iptables._apply_synchronized)
# The RuntimeError should have triggered a log of the input to the
# process that it failed to execute. Verify by comparing the log
# call to the 'process_input' arg given to the failed iptables-restore
# call.
# Line 11 of the input was marked as failing so lines (11 - context)
# to (11 + context) should be logged
ctx = iptables_manager.IPTABLES_ERROR_LINES_OF_CONTEXT
log_start = max(0, 11 - ctx)
log_end = 11 + ctx
logged = ['%7d. %s' % (n, l)
for n, l in enumerate(self.input_lines[log_start:log_end],
log_start + 1)]
log.error.assert_called_once_with(_(
'IPTablesManager.apply failed to apply the '
'following set of iptables rules:\n%s'),
'\n'.join(logged)
)
def test_get_traffic_counters_chain_notexists(self):
with mock.patch.object(iptables_manager, "LOG") as log:
acc = self.iptables.get_traffic_counters('chain1')
self.assertIsNone(acc)
self.assertEqual(0, self.execute.call_count)
log.warning.assert_called_once_with(
'Attempted to get traffic counters of chain %s which '
'does not exist', 'chain1')
def _test_get_traffic_counters_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
exp_packets = 800
exp_bytes = 131802
expected_calls_and_values = [
(mock.call(['iptables', '-t', 'filter', '-L', 'OUTPUT',
'-n', '-v', '-x'],
run_as_root=True),
TRAFFIC_COUNTERS_DUMP),
(mock.call(['iptables', '-t', 'raw', '-L', 'OUTPUT', '-n',
'-v', '-x'],
run_as_root=True),
''),
(mock.call(['iptables', '-t', 'mangle', '-L', 'OUTPUT', '-n',
'-v', '-x'],
run_as_root=True),
''),
(mock.call(['iptables', '-t', 'nat', '-L', 'OUTPUT', '-n',
'-v', '-x'],
run_as_root=True),
''),
]
if use_ipv6:
expected_calls_and_values.append(
(mock.call(['ip6tables', '-t', 'raw', '-L', 'OUTPUT',
'-n', '-v', '-x'], run_as_root=True),
''))
expected_calls_and_values.append(
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
'-n', '-v', '-x'],
run_as_root=True),
TRAFFIC_COUNTERS_DUMP))
expected_calls_and_values.append(
(mock.call(['ip6tables', '-t', 'mangle', '-L', 'OUTPUT',
'-n', '-v', '-x'], run_as_root=True),
''))
exp_packets *= 2
exp_bytes *= 2
tools.setup_mock_calls(self.execute, expected_calls_and_values)
acc = self.iptables.get_traffic_counters('OUTPUT')
self.assertEqual(acc['pkts'], exp_packets)
self.assertEqual(acc['bytes'], exp_bytes)
tools.verify_mock_calls(self.execute, expected_calls_and_values,
any_order=True)
def test_get_traffic_counters(self):
self._test_get_traffic_counters_helper(False)
def test_get_traffic_counters_with_ipv6(self):
self._test_get_traffic_counters_helper(True)
def _test_get_traffic_counters_with_zero_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
exp_packets = 800
exp_bytes = 131802
expected_calls_and_values = [
(mock.call(['iptables', '-t', 'filter', '-L', 'OUTPUT',
'-n', '-v', '-x', '-Z'],
run_as_root=True),
TRAFFIC_COUNTERS_DUMP),
(mock.call(['iptables', '-t', 'raw', '-L', 'OUTPUT', '-n',
'-v', '-x', '-Z'],
run_as_root=True),
''),
(mock.call(['iptables', '-t', 'mangle', '-L', 'OUTPUT', '-n',
'-v', '-x', '-Z'],
run_as_root=True),
''),
(mock.call(['iptables', '-t', 'nat', '-L', 'OUTPUT', '-n',
'-v', '-x', '-Z'],
run_as_root=True),
'')
]
if use_ipv6:
expected_calls_and_values.append(
(mock.call(['ip6tables', '-t', 'raw', '-L', 'OUTPUT',
'-n', '-v', '-x', '-Z'], run_as_root=True),
''))
expected_calls_and_values.append(
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
'-n', '-v', '-x', '-Z'],
run_as_root=True),
TRAFFIC_COUNTERS_DUMP))
expected_calls_and_values.append(
(mock.call(['ip6tables', '-t', 'mangle', '-L', 'OUTPUT',
'-n', '-v', '-x', '-Z'], run_as_root=True),
''))
exp_packets *= 2
exp_bytes *= 2
tools.setup_mock_calls(self.execute, expected_calls_and_values)
acc = self.iptables.get_traffic_counters('OUTPUT', zero=True)
self.assertEqual(acc['pkts'], exp_packets)
self.assertEqual(acc['bytes'], exp_bytes)
tools.verify_mock_calls(self.execute, expected_calls_and_values,
any_order=True)
def test_get_traffic_counters_with_zero(self):
self._test_get_traffic_counters_with_zero_helper(False)
def test_get_traffic_counters_with_zero_with_ipv6(self):
self._test_get_traffic_counters_with_zero_helper(True)
class IptablesManagerStateLessTestCase(base.BaseTestCase):
def setUp(self):
super(IptablesManagerStateLessTestCase, self).setUp()
cfg.CONF.set_override('comment_iptables_rules', False, 'AGENT')
self.iptables = (iptables_manager.IptablesManager(state_less=True))
def test_nat_not_found(self):
self.assertNotIn('nat', self.iptables.ipv4)
def test_mangle_not_found(self):
self.assertNotIn('mangle', self.iptables.ipv4)