@ -67,8 +67,8 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
def setUp ( self ) :
super ( IptablesManagerStateFulTestCase , self ) . setUp ( )
self . root_helper = ' sudo '
self . iptables = ( iptables_manager .
IptablesManager ( root_helper = self . root_helper ) )
self . iptables = iptables_manager . IptablesManager (
root_helper = self . root_helper )
self . execute = mock . patch . object ( self . iptables , " execute " ) . start ( )
def test_binary_name ( self ) :
@ -85,12 +85,32 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
self . assertEqual ( iptables_manager . get_chain_name ( name , wrap = True ) ,
name [ : 11 ] )
def test_add_and_remove_chain_custom_binary_name ( self ) :
def _extend_with_ip6tables_filter ( self , expected_calls , filter_dump ) :
expected_calls . insert ( 2 , (
mock . call ( [ ' ip6tables-save ' , ' -c ' ] ,
root_helper = self . root_helper ) ,
' ' ) )
expected_calls . insert ( 3 , (
mock . call ( [ ' ip6tables-restore ' , ' -c ' ] ,
process_input = filter_dump ,
root_helper = self . root_helper ) ,
None ) )
expected_calls . extend ( [
( mock . call ( [ ' ip6tables-save ' , ' -c ' ] ,
root_helper = self . root_helper ) ,
' ' ) ,
( mock . call ( [ ' ip6tables-restore ' , ' -c ' ] ,
process_input = filter_dump ,
root_helper = self . root_helper ) ,
None ) ] )
def _test_add_and_remove_chain_custom_binary_name_helper ( self , use_ipv6 ) :
bn = ( " abcdef " * 5 )
self . iptables = ( iptables_manager .
IptablesManager ( root_helper = self . root_helper ,
binary_name = bn ) )
self . iptables = iptables_manager . IptablesManager (
root_helper = self . root_helper ,
binary_name = bn ,
use_ipv6 = use_ipv6 )
self . execute = mock . patch . object ( self . iptables , " execute " ) . start ( )
iptables_args = { ' bn ' : bn [ : 16 ] }
@ -112,6 +132,23 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
' COMMIT \n '
' # Completed by iptables_manager \n ' % iptables_args )
filter_dump_ipv6 = ( ' # Generated by iptables_manager \n '
' *filter \n '
' :neutron-filter-top - [0:0] \n '
' : %(bn)s -FORWARD - [0:0] \n '
' : %(bn)s -INPUT - [0:0] \n '
' : %(bn)s -local - [0:0] \n '
' : %(bn)s -OUTPUT - [0:0] \n '
' [0:0] -A FORWARD -j neutron-filter-top \n '
' [0:0] -A OUTPUT -j neutron-filter-top \n '
' [0:0] -A neutron-filter-top -j %(bn)s -local \n '
' [0:0] -A INPUT -j %(bn)s -INPUT \n '
' [0:0] -A OUTPUT -j %(bn)s -OUTPUT \n '
' [0:0] -A FORWARD -j %(bn)s -FORWARD \n '
' COMMIT \n '
' # Completed by iptables_manager \n ' %
iptables_args )
filter_dump_mod = ( ' # Generated by iptables_manager \n '
' *filter \n '
' :neutron-filter-top - [0:0] \n '
@ -164,6 +201,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
root_helper = self . root_helper ) ,
None ) ,
]
if use_ipv6 :
self . _extend_with_ip6tables_filter ( expected_calls_and_values ,
filter_dump_ipv6 )
tools . setup_mock_calls ( self . execute , expected_calls_and_values )
self . iptables . ipv4 [ ' filter ' ] . add_chain ( ' filter ' )
@ -174,12 +215,19 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
tools . verify_mock_calls ( self . execute , expected_calls_and_values )
def test_empty_chain_custom_binary_name ( self ) :
def test_add_and_remove_chain_custom_binary_name ( self ) :
self . _test_add_and_remove_chain_custom_binary_name_helper ( False )
def test_add_and_remove_chain_custom_binary_name_with_ipv6 ( self ) :
self . _test_add_and_remove_chain_custom_binary_name_helper ( True )
def _test_empty_chain_custom_binary_name_helper ( self , use_ipv6 ) :
bn = ( " abcdef " * 5 ) [ : 16 ]
self . iptables = ( iptables_manager .
IptablesManager ( root_helper = self . root_helper ,
binary_name = bn ) )
self . iptables = iptables_manager . IptablesManager (
root_helper = self . root_helper ,
binary_name = bn ,
use_ipv6 = use_ipv6 )
self . execute = mock . patch . object ( self . iptables , " execute " ) . start ( )
iptables_args = { ' bn ' : bn }
@ -253,6 +301,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
root_helper = self . root_helper ) ,
None ) ,
]
if use_ipv6 :
self . _extend_with_ip6tables_filter ( expected_calls_and_values ,
filter_dump )
tools . setup_mock_calls ( self . execute , expected_calls_and_values )
self . iptables . ipv4 [ ' filter ' ] . add_chain ( ' filter ' )
@ -265,7 +317,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
tools . verify_mock_calls ( self . execute , expected_calls_and_values )
def test_add_and_remove_chain ( self ) :
def test_empty_chain_custom_binary_name ( self ) :
self . _test_empty_chain_custom_binary_name_helper ( False )
def test_empty_chain_custom_binary_name_with_ipv6 ( self ) :
self . _test_empty_chain_custom_binary_name_helper ( True )
def _test_add_and_remove_chain_helper ( self , use_ipv6 ) :
self . iptables = iptables_manager . IptablesManager (
root_helper = self . root_helper ,
use_ipv6 = use_ipv6 )
self . execute = mock . patch . object ( self . iptables , " execute " ) . start ( )
filter_dump_mod = ( ' # Generated by iptables_manager \n '
' *filter \n '
' :neutron-filter-top - [0:0] \n '
@ -300,6 +363,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
root_helper = self . root_helper ) ,
None ) ,
]
if use_ipv6 :
self . _extend_with_ip6tables_filter ( expected_calls_and_values ,
FILTER_DUMP )
tools . setup_mock_calls ( self . execute , expected_calls_and_values )
self . iptables . ipv4 [ ' filter ' ] . add_chain ( ' filter ' )
@ -310,7 +377,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
tools . verify_mock_calls ( self . execute , expected_calls_and_values )
def test_add_filter_rule ( self ) :
def test_add_and_remove_chain ( self ) :
self . _test_add_and_remove_chain_helper ( False )
def test_add_and_remove_chain_with_ipv6 ( self ) :
self . _test_add_and_remove_chain_helper ( True )
def _test_add_filter_rule_helper ( self , use_ipv6 ) :
self . iptables = iptables_manager . IptablesManager (
root_helper = self . root_helper ,
use_ipv6 = use_ipv6 )
self . execute = mock . patch . object ( self . iptables , " execute " ) . start ( )
filter_dump_mod = ( ' # Generated by iptables_manager \n '
' *filter \n '
' :neutron-filter-top - [0:0] \n '
@ -349,6 +427,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
) ,
None ) ,
]
if use_ipv6 :
self . _extend_with_ip6tables_filter ( expected_calls_and_values ,
FILTER_DUMP )
tools . setup_mock_calls ( self . execute , expected_calls_and_values )
self . iptables . ipv4 [ ' filter ' ] . add_chain ( ' filter ' )
@ -369,7 +451,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
tools . verify_mock_calls ( self . execute , expected_calls_and_values )
def test_rule_with_wrap_target ( self ) :
def test_add_filter_rule ( self ) :
self . _test_add_filter_rule_helper ( False )
def test_add_filter_rule_with_ipv6 ( self ) :
self . _test_add_filter_rule_helper ( True )
def _test_rule_with_wrap_target_helper ( self , use_ipv6 ) :
self . iptables = iptables_manager . IptablesManager (
root_helper = self . root_helper ,
use_ipv6 = use_ipv6 )
self . execute = mock . patch . object ( self . iptables , " execute " ) . start ( )
name = ' 0123456789 ' * 5
wrap = " %s - %s " % ( iptables_manager . binary_name ,
iptables_manager . get_chain_name ( name ) )
@ -413,6 +506,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
root_helper = self . root_helper ) ,
None ) ,
]
if use_ipv6 :
self . _extend_with_ip6tables_filter ( expected_calls_and_values ,
FILTER_DUMP )
tools . setup_mock_calls ( self . execute , expected_calls_and_values )
self . iptables . ipv4 [ ' filter ' ] . add_chain ( name )
@ -430,7 +527,18 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
tools . verify_mock_calls ( self . execute , expected_calls_and_values )
def test_add_nat_rule ( self ) :
def test_rule_with_wrap_target ( self ) :
self . _test_rule_with_wrap_target_helper ( False )
def test_rule_with_wrap_target_with_ipv6 ( self ) :
self . _test_rule_with_wrap_target_helper ( True )
def _test_add_nat_rule_helper ( self , use_ipv6 ) :
self . iptables = iptables_manager . IptablesManager (
root_helper = self . root_helper ,
use_ipv6 = use_ipv6 )
self . execute = mock . patch . object ( self . iptables , " execute " ) . start ( )
nat_dump = ( ' # Generated by iptables_manager \n '
' *nat \n '
' :neutron-postrouting-bottom - [0:0] \n '
@ -488,6 +596,10 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
root_helper = self . root_helper ) ,
None ) ,
]
if use_ipv6 :
self . _extend_with_ip6tables_filter ( expected_calls_and_values ,
FILTER_DUMP )
tools . setup_mock_calls ( self . execute , expected_calls_and_values )
self . iptables . ipv4 [ ' nat ' ] . add_chain ( ' nat ' )
@ -512,6 +624,12 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
tools . verify_mock_calls ( self . execute , expected_calls_and_values )
def test_add_nat_rule ( self ) :
self . _test_add_nat_rule_helper ( False )
def test_add_nat_rule_with_ipv6 ( self ) :
self . _test_add_nat_rule_helper ( True )
def test_add_rule_to_a_nonexistent_chain ( self ) :
self . assertRaises ( LookupError , self . iptables . ipv4 [ ' filter ' ] . add_rule ,
' nonexistent ' , ' -j DROP ' )
@ -604,7 +722,14 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
' Attempted to get traffic counters of chain %s which '
' does not exist ' , ' chain1 ' )
def test_get_traffic_counters ( self ) :
def _test_get_traffic_counters_helper ( self , use_ipv6 ) :
self . iptables = iptables_manager . IptablesManager (
root_helper = self . root_helper ,
use_ipv6 = use_ipv6 )
self . execute = mock . patch . object ( self . iptables , " execute " ) . start ( )
exp_packets = 800
exp_bytes = 131802
iptables_dump = (
' Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes) \n '
' pkts bytes target prot opt in out source '
@ -623,20 +748,38 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
' -v ' , ' -x ' ] ,
root_helper = self . root_helper ) ,
' ' ) ,
( mock . call ( [ ' ip6tables ' , ' -t ' , ' filter ' , ' -L ' , ' OUTPUT ' ,
' -n ' , ' -v ' , ' -x ' ] ,
root_helper = self . root_helper ) ,
iptables_dump ) ,
]
if use_ipv6 :
expected_calls_and_values . append (
( mock . call ( [ ' ip6tables ' , ' -t ' , ' filter ' , ' -L ' , ' OUTPUT ' ,
' -n ' , ' -v ' , ' -x ' ] ,
root_helper = self . root_helper ) ,
iptables_dump ) )
exp_packets * = 2
exp_bytes * = 2
tools . setup_mock_calls ( self . execute , expected_calls_and_values )
acc = self . iptables . get_traffic_counters ( ' OUTPUT ' )
self . assertEqual ( acc [ ' pkts ' ] , 1600 )
self . assertEqual ( acc [ ' bytes ' ] , 263604 )
self . assertEqual ( acc [ ' pkts ' ] , exp_packets )
self . assertEqual ( acc [ ' bytes ' ] , exp_bytes )
tools . verify_mock_calls ( self . execute , expected_calls_and_values )
def test_get_traffic_counters_with_zero ( self ) :
def test_get_traffic_counters ( self ) :
self . _test_get_traffic_counters_helper ( False )
def test_get_traffic_counters_with_ipv6 ( self ) :
self . _test_get_traffic_counters_helper ( True )
def _test_get_traffic_counters_with_zero_helper ( self , use_ipv6 ) :
self . iptables = iptables_manager . IptablesManager (
root_helper = self . root_helper ,
use_ipv6 = use_ipv6 )
self . execute = mock . patch . object ( self . iptables , " execute " ) . start ( )
exp_packets = 800
exp_bytes = 131802
iptables_dump = (
' Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes) \n '
' pkts bytes target prot opt in out source '
@ -654,20 +797,31 @@ class IptablesManagerStateFulTestCase(base.BaseTestCase):
( mock . call ( [ ' iptables ' , ' -t ' , ' nat ' , ' -L ' , ' OUTPUT ' , ' -n ' ,
' -v ' , ' -x ' , ' -Z ' ] ,
root_helper = self . root_helper ) ,
' ' ) ,
( mock . call ( [ ' ip6tables ' , ' -t ' , ' filter ' , ' -L ' , ' OUTPUT ' ,
' -n ' , ' -v ' , ' -x ' , ' -Z ' ] ,
root_helper = self . root_helper ) ,
iptables_dump ) ,
' ' )
]
if use_ipv6 :
expected_calls_and_values . append (
( mock . call ( [ ' ip6tables ' , ' -t ' , ' filter ' , ' -L ' , ' OUTPUT ' ,
' -n ' , ' -v ' , ' -x ' , ' -Z ' ] ,
root_helper = self . root_helper ) ,
iptables_dump ) )
exp_packets * = 2
exp_bytes * = 2
tools . setup_mock_calls ( self . execute , expected_calls_and_values )
acc = self . iptables . get_traffic_counters ( ' OUTPUT ' , zero = True )
self . assertEqual ( acc [ ' pkts ' ] , 1600 )
self . assertEqual ( acc [ ' bytes ' ] , 263604 )
self . assertEqual ( acc [ ' pkts ' ] , exp_packets )
self . assertEqual ( acc [ ' bytes ' ] , exp_bytes )
tools . verify_mock_calls ( self . execute , expected_calls_and_values )
def test_get_traffic_counters_with_zero ( self ) :
self . _test_get_traffic_counters_with_zero_helper ( False )
def test_get_traffic_counters_with_zero_with_ipv6 ( self ) :
self . _test_get_traffic_counters_with_zero_helper ( True )
def _test_find_last_entry ( self , find_str ) :
filter_list = [ ' :neutron-filter-top - [0:0] ' ,
' : %(bn)s -FORWARD - [0:0] ' ,