Ensure ip6tables are used only if ipv6 is enabled in kernel
On systems where ipv6 module is not loaded in kernel we need to avoid usage of ip6tables. This patch reads /proc/sys/net/ipv6/conf/default/disable_ipv6 file and if ipv6 is disabled then ip6tables are not used in IptablesManager Closes-Bug: #1352893 Change-Id: I07e5851aa25eb98b7a97dff86b9850475df85f64
This commit is contained in:
parent
0e6864b3f4
commit
607aaae88e
|
@ -32,6 +32,7 @@ from neutron.agent.linux import ra
|
|||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.common import config as common_config
|
||||
from neutron.common import constants as l3_constants
|
||||
from neutron.common import ipv6_utils
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils as common_utils
|
||||
|
@ -144,7 +145,8 @@ class L3PluginApi(n_rpc.RpcProxy):
|
|||
|
||||
class RouterInfo(object):
|
||||
|
||||
def __init__(self, router_id, root_helper, use_namespaces, router):
|
||||
def __init__(self, router_id, root_helper, use_namespaces, router,
|
||||
use_ipv6=False):
|
||||
self.router_id = router_id
|
||||
self.ex_gw_port = None
|
||||
self._snat_enabled = None
|
||||
|
@ -160,7 +162,7 @@ class RouterInfo(object):
|
|||
self.ns_name = NS_PREFIX + router_id if use_namespaces else None
|
||||
self.iptables_manager = iptables_manager.IptablesManager(
|
||||
root_helper=root_helper,
|
||||
#FIXME(danwent): use_ipv6=True,
|
||||
use_ipv6=use_ipv6,
|
||||
namespace=self.ns_name)
|
||||
self.routes = []
|
||||
# DVR Data
|
||||
|
@ -440,6 +442,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||
super(L3NATAgent, self).__init__(conf=self.conf)
|
||||
|
||||
self.target_ex_net_id = None
|
||||
self.use_ipv6 = ipv6_utils.is_enabled()
|
||||
|
||||
def _check_config_params(self):
|
||||
"""Check items in configuration files.
|
||||
|
@ -609,7 +612,8 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||
|
||||
def _router_added(self, router_id, router):
|
||||
ri = RouterInfo(router_id, self.root_helper,
|
||||
self.conf.use_namespaces, router)
|
||||
self.conf.use_namespaces, router,
|
||||
use_ipv6=self.use_ipv6)
|
||||
self.router_info[router_id] = ri
|
||||
if self.conf.use_namespaces:
|
||||
self._create_router_namespace(ri)
|
||||
|
@ -1076,11 +1080,10 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
|
|||
SNAT_INT_DEV_PREFIX)
|
||||
self._external_gateway_added(ri, ex_gw_port, gw_interface_name,
|
||||
snat_ns_name, preserve_ips=[])
|
||||
ri.snat_iptables_manager = (
|
||||
iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper, namespace=snat_ns_name
|
||||
)
|
||||
)
|
||||
ri.snat_iptables_manager = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
namespace=snat_ns_name,
|
||||
use_ipv6=self.use_ipv6)
|
||||
|
||||
def external_gateway_added(self, ri, ex_gw_port, interface_name):
|
||||
if ri.router['distributed']:
|
||||
|
|
|
@ -19,6 +19,7 @@ from oslo.config import cfg
|
|||
from neutron.agent import firewall
|
||||
from neutron.agent.linux import iptables_manager
|
||||
from neutron.common import constants
|
||||
from neutron.common import ipv6_utils
|
||||
from neutron.openstack.common import log as logging
|
||||
|
||||
|
||||
|
@ -41,7 +42,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
|
|||
def __init__(self):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=cfg.CONF.AGENT.root_helper,
|
||||
use_ipv6=True)
|
||||
use_ipv6=ipv6_utils.is_enabled())
|
||||
# list of port which has security group
|
||||
self.filtered_ports = {}
|
||||
self._add_fallback_chain_v4v6()
|
||||
|
|
|
@ -625,8 +625,10 @@ class IptablesManager(object):
|
|||
cmd_tables = [('iptables', key) for key, table in self.ipv4.items()
|
||||
if name in table._select_chain_set(wrap)]
|
||||
|
||||
cmd_tables += [('ip6tables', key) for key, table in self.ipv6.items()
|
||||
if name in table._select_chain_set(wrap)]
|
||||
if self.use_ipv6:
|
||||
cmd_tables += [('ip6tables', key)
|
||||
for key, table in self.ipv6.items()
|
||||
if name in table._select_chain_set(wrap)]
|
||||
|
||||
return cmd_tables
|
||||
|
||||
|
|
|
@ -20,6 +20,9 @@ IPv6-related utilities and helper functions.
|
|||
import netaddr
|
||||
|
||||
|
||||
_IS_IPV6_ENABLED = None
|
||||
|
||||
|
||||
def get_ipv6_addr_by_EUI64(prefix, mac):
|
||||
# Check if the prefix is IPv4 address
|
||||
isIPv4 = netaddr.valid_ipv4(prefix)
|
||||
|
@ -37,3 +40,14 @@ def get_ipv6_addr_by_EUI64(prefix, mac):
|
|||
except TypeError:
|
||||
raise TypeError(_('Bad prefix type for generate IPv6 address by '
|
||||
'EUI-64: %s') % prefix)
|
||||
|
||||
|
||||
def is_enabled():
|
||||
global _IS_IPV6_ENABLED
|
||||
|
||||
if _IS_IPV6_ENABLED is None:
|
||||
disabled_ipv6_path = "/proc/sys/net/ipv6/conf/default/disable_ipv6"
|
||||
with open(disabled_ipv6_path, 'r') as f:
|
||||
disabled = f.read().strip()
|
||||
_IS_IPV6_ENABLED = disabled == "0"
|
||||
return _IS_IPV6_ENABLED
|
||||
|
|
|
@ -20,6 +20,7 @@ from neutron.agent.common import config
|
|||
from neutron.agent.linux import interface
|
||||
from neutron.agent.linux import iptables_manager
|
||||
from neutron.common import constants as constants
|
||||
from neutron.common import ipv6_utils
|
||||
from neutron.common import log
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.openstack.common import log as logging
|
||||
|
@ -74,7 +75,8 @@ class RouterWithMetering(object):
|
|||
self.iptables_manager = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
namespace=self.ns_name,
|
||||
binary_name=WRAP_NAME)
|
||||
binary_name=WRAP_NAME,
|
||||
use_ipv6=ipv6_utils.is_enabled())
|
||||
self.metering_labels = {}
|
||||
|
||||
|
||||
|
|
|
@ -65,7 +65,8 @@ class IptablesDriverTestCase(base.BaseTestCase):
|
|||
|
||||
self.iptables_cls.assert_called_with(root_helper='fake_sudo',
|
||||
namespace=mock.ANY,
|
||||
binary_name=mock.ANY)
|
||||
binary_name=mock.ANY,
|
||||
use_ipv6=mock.ANY)
|
||||
|
||||
def test_add_metering_label(self):
|
||||
routers = [{'_metering_labels': [
|
||||
|
|
|
@ -67,8 +67,8 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
def setUp(self):
|
||||
super(IptablesManagerStateFulTestCase, self).setUp()
|
||||
self.root_helper = 'sudo'
|
||||
self.iptables = (iptables_manager.
|
||||
IptablesManager(root_helper=self.root_helper))
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
def test_binary_name(self):
|
||||
|
@ -85,12 +85,32 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
self.assertEqual(iptables_manager.get_chain_name(name, wrap=True),
|
||||
name[:11])
|
||||
|
||||
def test_add_and_remove_chain_custom_binary_name(self):
|
||||
def _extend_with_ip6tables_filter(self, expected_calls, filter_dump):
|
||||
expected_calls.insert(2, (
|
||||
mock.call(['ip6tables-save', '-c'],
|
||||
root_helper=self.root_helper),
|
||||
''))
|
||||
expected_calls.insert(3, (
|
||||
mock.call(['ip6tables-restore', '-c'],
|
||||
process_input=filter_dump,
|
||||
root_helper=self.root_helper),
|
||||
None))
|
||||
expected_calls.extend([
|
||||
(mock.call(['ip6tables-save', '-c'],
|
||||
root_helper=self.root_helper),
|
||||
''),
|
||||
(mock.call(['ip6tables-restore', '-c'],
|
||||
process_input=filter_dump,
|
||||
root_helper=self.root_helper),
|
||||
None)])
|
||||
|
||||
def _test_add_and_remove_chain_custom_binary_name_helper(self, use_ipv6):
|
||||
bn = ("abcdef" * 5)
|
||||
|
||||
self.iptables = (iptables_manager.
|
||||
IptablesManager(root_helper=self.root_helper,
|
||||
binary_name=bn))
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
binary_name=bn,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
iptables_args = {'bn': bn[:16]}
|
||||
|
@ -112,6 +132,23 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
'COMMIT\n'
|
||||
'# Completed by iptables_manager\n' % iptables_args)
|
||||
|
||||
filter_dump_ipv6 = ('# Generated by iptables_manager\n'
|
||||
'*filter\n'
|
||||
':neutron-filter-top - [0:0]\n'
|
||||
':%(bn)s-FORWARD - [0:0]\n'
|
||||
':%(bn)s-INPUT - [0:0]\n'
|
||||
':%(bn)s-local - [0:0]\n'
|
||||
':%(bn)s-OUTPUT - [0:0]\n'
|
||||
'[0:0] -A FORWARD -j neutron-filter-top\n'
|
||||
'[0:0] -A OUTPUT -j neutron-filter-top\n'
|
||||
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
|
||||
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
|
||||
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
|
||||
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
|
||||
'COMMIT\n'
|
||||
'# Completed by iptables_manager\n' %
|
||||
iptables_args)
|
||||
|
||||
filter_dump_mod = ('# Generated by iptables_manager\n'
|
||||
'*filter\n'
|
||||
':neutron-filter-top - [0:0]\n'
|
||||
|
@ -164,6 +201,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
root_helper=self.root_helper),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
filter_dump_ipv6)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['filter'].add_chain('filter')
|
||||
|
@ -174,12 +215,19 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_empty_chain_custom_binary_name(self):
|
||||
def test_add_and_remove_chain_custom_binary_name(self):
|
||||
self._test_add_and_remove_chain_custom_binary_name_helper(False)
|
||||
|
||||
def test_add_and_remove_chain_custom_binary_name_with_ipv6(self):
|
||||
self._test_add_and_remove_chain_custom_binary_name_helper(True)
|
||||
|
||||
def _test_empty_chain_custom_binary_name_helper(self, use_ipv6):
|
||||
bn = ("abcdef" * 5)[:16]
|
||||
|
||||
self.iptables = (iptables_manager.
|
||||
IptablesManager(root_helper=self.root_helper,
|
||||
binary_name=bn))
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
binary_name=bn,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
iptables_args = {'bn': bn}
|
||||
|
@ -253,6 +301,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
root_helper=self.root_helper),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
filter_dump)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['filter'].add_chain('filter')
|
||||
|
@ -265,7 +317,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_add_and_remove_chain(self):
|
||||
def test_empty_chain_custom_binary_name(self):
|
||||
self._test_empty_chain_custom_binary_name_helper(False)
|
||||
|
||||
def test_empty_chain_custom_binary_name_with_ipv6(self):
|
||||
self._test_empty_chain_custom_binary_name_helper(True)
|
||||
|
||||
def _test_add_and_remove_chain_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
filter_dump_mod = ('# Generated by iptables_manager\n'
|
||||
'*filter\n'
|
||||
':neutron-filter-top - [0:0]\n'
|
||||
|
@ -300,6 +363,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
root_helper=self.root_helper),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
FILTER_DUMP)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['filter'].add_chain('filter')
|
||||
|
@ -310,7 +377,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_add_filter_rule(self):
|
||||
def test_add_and_remove_chain(self):
|
||||
self._test_add_and_remove_chain_helper(False)
|
||||
|
||||
def test_add_and_remove_chain_with_ipv6(self):
|
||||
self._test_add_and_remove_chain_helper(True)
|
||||
|
||||
def _test_add_filter_rule_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
filter_dump_mod = ('# Generated by iptables_manager\n'
|
||||
'*filter\n'
|
||||
':neutron-filter-top - [0:0]\n'
|
||||
|
@ -349,6 +427,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
FILTER_DUMP)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['filter'].add_chain('filter')
|
||||
|
@ -369,7 +451,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_rule_with_wrap_target(self):
|
||||
def test_add_filter_rule(self):
|
||||
self._test_add_filter_rule_helper(False)
|
||||
|
||||
def test_add_filter_rule_with_ipv6(self):
|
||||
self._test_add_filter_rule_helper(True)
|
||||
|
||||
def _test_rule_with_wrap_target_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
name = '0123456789' * 5
|
||||
wrap = "%s-%s" % (iptables_manager.binary_name,
|
||||
iptables_manager.get_chain_name(name))
|
||||
|
@ -413,6 +506,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
root_helper=self.root_helper),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
FILTER_DUMP)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['filter'].add_chain(name)
|
||||
|
@ -430,7 +527,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_add_nat_rule(self):
|
||||
def test_rule_with_wrap_target(self):
|
||||
self._test_rule_with_wrap_target_helper(False)
|
||||
|
||||
def test_rule_with_wrap_target_with_ipv6(self):
|
||||
self._test_rule_with_wrap_target_helper(True)
|
||||
|
||||
def _test_add_nat_rule_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
|
||||
nat_dump = ('# Generated by iptables_manager\n'
|
||||
'*nat\n'
|
||||
':neutron-postrouting-bottom - [0:0]\n'
|
||||
|
@ -488,6 +596,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
root_helper=self.root_helper),
|
||||
None),
|
||||
]
|
||||
if use_ipv6:
|
||||
self._extend_with_ip6tables_filter(expected_calls_and_values,
|
||||
FILTER_DUMP)
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
self.iptables.ipv4['nat'].add_chain('nat')
|
||||
|
@ -512,6 +624,12 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_add_nat_rule(self):
|
||||
self._test_add_nat_rule_helper(False)
|
||||
|
||||
def test_add_nat_rule_with_ipv6(self):
|
||||
self._test_add_nat_rule_helper(True)
|
||||
|
||||
def test_add_rule_to_a_nonexistent_chain(self):
|
||||
self.assertRaises(LookupError, self.iptables.ipv4['filter'].add_rule,
|
||||
'nonexistent', '-j DROP')
|
||||
|
@ -604,7 +722,14 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
'Attempted to get traffic counters of chain %s which '
|
||||
'does not exist', 'chain1')
|
||||
|
||||
def test_get_traffic_counters(self):
|
||||
def _test_get_traffic_counters_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
exp_packets = 800
|
||||
exp_bytes = 131802
|
||||
|
||||
iptables_dump = (
|
||||
'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
|
||||
' pkts bytes target prot opt in out source'
|
||||
|
@ -623,20 +748,38 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
'-v', '-x'],
|
||||
root_helper=self.root_helper),
|
||||
''),
|
||||
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
|
||||
'-n', '-v', '-x'],
|
||||
root_helper=self.root_helper),
|
||||
iptables_dump),
|
||||
]
|
||||
if use_ipv6:
|
||||
expected_calls_and_values.append(
|
||||
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
|
||||
'-n', '-v', '-x'],
|
||||
root_helper=self.root_helper),
|
||||
iptables_dump))
|
||||
exp_packets *= 2
|
||||
exp_bytes *= 2
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
acc = self.iptables.get_traffic_counters('OUTPUT')
|
||||
self.assertEqual(acc['pkts'], 1600)
|
||||
self.assertEqual(acc['bytes'], 263604)
|
||||
self.assertEqual(acc['pkts'], exp_packets)
|
||||
self.assertEqual(acc['bytes'], exp_bytes)
|
||||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_get_traffic_counters_with_zero(self):
|
||||
def test_get_traffic_counters(self):
|
||||
self._test_get_traffic_counters_helper(False)
|
||||
|
||||
def test_get_traffic_counters_with_ipv6(self):
|
||||
self._test_get_traffic_counters_helper(True)
|
||||
|
||||
def _test_get_traffic_counters_with_zero_helper(self, use_ipv6):
|
||||
self.iptables = iptables_manager.IptablesManager(
|
||||
root_helper=self.root_helper,
|
||||
use_ipv6=use_ipv6)
|
||||
self.execute = mock.patch.object(self.iptables, "execute").start()
|
||||
exp_packets = 800
|
||||
exp_bytes = 131802
|
||||
|
||||
iptables_dump = (
|
||||
'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
|
||||
' pkts bytes target prot opt in out source'
|
||||
|
@ -654,20 +797,31 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
|
|||
(mock.call(['iptables', '-t', 'nat', '-L', 'OUTPUT', '-n',
|
||||
'-v', '-x', '-Z'],
|
||||
root_helper=self.root_helper),
|
||||
''),
|
||||
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
|
||||
'-n', '-v', '-x', '-Z'],
|
||||
root_helper=self.root_helper),
|
||||
iptables_dump),
|
||||
'')
|
||||
]
|
||||
if use_ipv6:
|
||||
expected_calls_and_values.append(
|
||||
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
|
||||
'-n', '-v', '-x', '-Z'],
|
||||
root_helper=self.root_helper),
|
||||
iptables_dump))
|
||||
exp_packets *= 2
|
||||
exp_bytes *= 2
|
||||
|
||||
tools.setup_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
acc = self.iptables.get_traffic_counters('OUTPUT', zero=True)
|
||||
self.assertEqual(acc['pkts'], 1600)
|
||||
self.assertEqual(acc['bytes'], 263604)
|
||||
self.assertEqual(acc['pkts'], exp_packets)
|
||||
self.assertEqual(acc['bytes'], exp_bytes)
|
||||
|
||||
tools.verify_mock_calls(self.execute, expected_calls_and_values)
|
||||
|
||||
def test_get_traffic_counters_with_zero(self):
|
||||
self._test_get_traffic_counters_with_zero_helper(False)
|
||||
|
||||
def test_get_traffic_counters_with_zero_with_ipv6(self):
|
||||
self._test_get_traffic_counters_with_zero_helper(True)
|
||||
|
||||
def _test_find_last_entry(self, find_str):
|
||||
filter_list = [':neutron-filter-top - [0:0]',
|
||||
':%(bn)s-FORWARD - [0:0]',
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutron.common import ipv6_utils
|
||||
from neutron.tests import base
|
||||
|
||||
|
@ -48,3 +50,29 @@ class IPv6byEUI64TestCase(base.BaseTestCase):
|
|||
prefix = 123
|
||||
self.assertRaises(TypeError, lambda:
|
||||
ipv6_utils.get_ipv6_addr_by_EUI64(prefix, mac))
|
||||
|
||||
|
||||
class TestIsEnabled(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestIsEnabled, self).setUp()
|
||||
ipv6_utils._IS_IPV6_ENABLED = None
|
||||
mock_open = mock.patch("__builtin__.open").start()
|
||||
self.mock_read = mock_open.return_value.__enter__.return_value.read
|
||||
|
||||
def test_enabled(self):
|
||||
self.mock_read.return_value = "0"
|
||||
enabled = ipv6_utils.is_enabled()
|
||||
self.assertTrue(enabled)
|
||||
|
||||
def test_disabled(self):
|
||||
self.mock_read.return_value = "1"
|
||||
enabled = ipv6_utils.is_enabled()
|
||||
self.assertFalse(enabled)
|
||||
|
||||
def test_memoize(self):
|
||||
self.mock_read.return_value = "0"
|
||||
ipv6_utils.is_enabled()
|
||||
enabled = ipv6_utils.is_enabled()
|
||||
self.assertTrue(enabled)
|
||||
self.mock_read.assert_called_once_with()
|
||||
|
|
|
@ -1821,6 +1821,9 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
|
|||
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
|
||||
|
||||
self.iptables = self.agent.firewall.iptables
|
||||
# TODO(jlibosva) Get rid of mocking iptables execute and mock out
|
||||
# firewall instead
|
||||
self.iptables.use_ipv6 = True
|
||||
self.iptables_execute = mock.patch.object(self.iptables,
|
||||
"execute").start()
|
||||
self.iptables_execute_return_values = []
|
||||
|
|
Loading…
Reference in New Issue