@ -1256,12 +1256,12 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase):
self . firewall . pre_sg_members = { }
port = self . _fake_port ( )
self . firewall . prepare_port_filter ( port )
calls = [ mock . call . create_ipset_chain ( ' IPv4fake_sgid' , ' IPv4 ' ) ,
calls = [ mock . call . create_ipset_chain ( ' NET IPv4fake_sgid' , ' IPv4 ' ) ,
mock . call . refresh_ipset_chain_by_name (
' IPv4fake_sgid' , [ ' 10.0.0.1 ' , ' 10.0.0.2 ' ] , ' IPv4 ' ) ,
mock . call . create_ipset_chain ( ' IPv6fake_sgid' , ' IPv6 ' ) ,
' NET IPv4fake_sgid' , [ ' 10.0.0.1 ' , ' 10.0.0.2 ' ] , ' IPv4 ' ) ,
mock . call . create_ipset_chain ( ' NET IPv6fake_sgid' , ' IPv6 ' ) ,
mock . call . refresh_ipset_chain_by_name (
' IPv6fake_sgid' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
' NET IPv6fake_sgid' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
self . firewall . ipset . assert_has_calls ( calls )
@ -1273,18 +1273,18 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase):
self . firewall . pre_sg_members = { }
port = self . _fake_port ( )
self . firewall . prepare_port_filter ( port )
calls = [ mock . call . create_ipset_chain ( ' IPv4fake_sgid' , ' IPv4 ' ) ,
calls = [ mock . call . create_ipset_chain ( ' NET IPv4fake_sgid' , ' IPv4 ' ) ,
mock . call . refresh_ipset_chain_by_name (
' IPv4fake_sgid' , TEST_IP_RANGE [ : 5 ] , ' IPv4 ' ) ,
mock . call . create_ipset_chain ( ' IPv6fake_sgid' , ' IPv6 ' ) ,
' NET IPv4fake_sgid' , TEST_IP_RANGE [ : 5 ] , ' IPv4 ' ) ,
mock . call . create_ipset_chain ( ' NET IPv6fake_sgid' , ' IPv6 ' ) ,
mock . call . refresh_ipset_chain_by_name (
' IPv6fake_sgid' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
' NET IPv6fake_sgid' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
self . firewall . ipset . assert_has_calls ( calls )
def test_prepare_port_filter_with_ipset_chain_exist ( self ) :
self . firewall . sg_rules = self . _fake_sg_rule ( )
self . firewall . ipset_chains = { ' IPv4fake_sgid' : [ ' 10.0.0.2 ' ] }
self . firewall . ipset_chains = { ' NET IPv4fake_sgid' : [ ' 10.0.0.2 ' ] }
self . firewall . sg_members = { ' fake_sgid ' : {
' IPv4 ' : TEST_IP_RANGE [ : 5 ] ,
' IPv6 ' : [ ' fe80::1 ' ] } }
@ -1294,19 +1294,23 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase):
port = self . _fake_port ( )
self . firewall . prepare_port_filter ( port )
calls = [
mock . call . add_member_to_ipset_chain ( ' IPv4fake_sgid ' , ' 10.0.0.1 ' ) ,
mock . call . add_member_to_ipset_chain ( ' IPv4fake_sgid ' , ' 10.0.0.3 ' ) ,
mock . call . add_member_to_ipset_chain ( ' IPv4fake_sgid ' , ' 10.0.0.4 ' ) ,
mock . call . add_member_to_ipset_chain ( ' IPv4fake_sgid ' , ' 10.0.0.5 ' ) ,
mock . call . create_ipset_chain ( ' IPv6fake_sgid ' , ' IPv6 ' ) ,
mock . call . add_member_to_ipset_chain ( ' NETIPv4fake_sgid ' ,
' 10.0.0.1 ' ) ,
mock . call . add_member_to_ipset_chain ( ' NETIPv4fake_sgid ' ,
' 10.0.0.3 ' ) ,
mock . call . add_member_to_ipset_chain ( ' NETIPv4fake_sgid ' ,
' 10.0.0.4 ' ) ,
mock . call . add_member_to_ipset_chain ( ' NETIPv4fake_sgid ' ,
' 10.0.0.5 ' ) ,
mock . call . create_ipset_chain ( ' NETIPv6fake_sgid ' , ' IPv6 ' ) ,
mock . call . refresh_ipset_chain_by_name (
' IPv6fake_sgid ' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
' NET IPv6fake_sgid' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
self . firewall . ipset . assert_has_calls ( calls , True )
def test_prepare_port_filter_with_del_member ( self ) :
self . firewall . sg_rules = self . _fake_sg_rule ( )
self . firewall . ipset_chains = { ' IPv4fake_sgid' : [ ' 10.0.0.2 ' ] }
self . firewall . ipset_chains = { ' NET IPv4fake_sgid' : [ ' 10.0.0.2 ' ] }
self . firewall . sg_members = { ' fake_sgid ' : {
' IPv4 ' : [
' 10.0.0.1 ' , ' 10.0.0.3 ' , ' 10.0.0.4 ' , ' 10.0.0.5 ' ] ,
@ -1317,20 +1321,25 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase):
port = self . _fake_port ( )
self . firewall . prepare_port_filter ( port )
calls = [
mock . call . add_member_to_ipset_chain ( ' IPv4fake_sgid ' , ' 10.0.0.1 ' ) ,
mock . call . add_member_to_ipset_chain ( ' IPv4fake_sgid ' , ' 10.0.0.3 ' ) ,
mock . call . add_member_to_ipset_chain ( ' IPv4fake_sgid ' , ' 10.0.0.4 ' ) ,
mock . call . add_member_to_ipset_chain ( ' IPv4fake_sgid ' , ' 10.0.0.5 ' ) ,
mock . call . del_ipset_chain_member ( ' IPv4fake_sgid ' , ' 10.0.0.2 ' ) ,
mock . call . create_ipset_chain ( ' IPv6fake_sgid ' , ' IPv6 ' ) ,
mock . call . add_member_to_ipset_chain ( ' NETIPv4fake_sgid ' ,
' 10.0.0.1 ' ) ,
mock . call . add_member_to_ipset_chain ( ' NETIPv4fake_sgid ' ,
' 10.0.0.3 ' ) ,
mock . call . add_member_to_ipset_chain ( ' NETIPv4fake_sgid ' ,
' 10.0.0.4 ' ) ,
mock . call . add_member_to_ipset_chain ( ' NETIPv4fake_sgid ' ,
' 10.0.0.5 ' ) ,
mock . call . del_ipset_chain_member ( ' NETIPv4fake_sgid ' ,
' 10.0.0.2 ' ) ,
mock . call . create_ipset_chain ( ' NETIPv6fake_sgid ' , ' IPv6 ' ) ,
mock . call . refresh_ipset_chain_by_name (
' IPv6fake_sgid ' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
' NET IPv6fake_sgid' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
self . firewall . ipset . assert_has_calls ( calls , True )
def test_prepare_port_filter_change_beyond_9 ( self ) :
self . firewall . sg_rules = self . _fake_sg_rule ( )
self . firewall . ipset_chains = { ' IPv4fake_sgid' : TEST_IP_RANGE [ 5 : ] }
self . firewall . ipset_chains = { ' NET IPv4fake_sgid' : TEST_IP_RANGE [ 5 : ] }
self . firewall . sg_members = { ' fake_sgid ' : {
' IPv4 ' : TEST_IP_RANGE [ : 5 ] ,
' IPv6 ' : [ ' fe80::1 ' ] } }
@ -1340,11 +1349,11 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase):
port = self . _fake_port ( )
self . firewall . prepare_port_filter ( port )
calls = [
mock . call . refresh_ipset_chain_by_name ( ' IPv4fake_sgid' ,
mock . call . refresh_ipset_chain_by_name ( ' NET IPv4fake_sgid' ,
TEST_IP_RANGE [ : 5 ] , ' IPv4 ' ) ,
mock . call . create_ipset_chain ( ' IPv6fake_sgid' , ' IPv6 ' ) ,
mock . call . create_ipset_chain ( ' NET IPv6fake_sgid' , ' IPv6 ' ) ,
mock . call . refresh_ipset_chain_by_name (
' IPv6fake_sgid' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
' NET IPv6fake_sgid' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
self . firewall . ipset . assert_has_calls ( calls )
@ -1359,11 +1368,11 @@ class IptablesFirewallEnhancedIpsetTestCase(BaseIptablesFirewallTestCase):
port = self . _fake_port ( )
port [ ' security_group_source_groups ' ] . append ( ' fake_sgid2 ' )
self . firewall . prepare_port_filter ( port )
calls = [ mock . call . create_ipset_chain ( ' IPv4fake_sgid' , ' IPv4 ' ) ,
calls = [ mock . call . create_ipset_chain ( ' NET IPv4fake_sgid' , ' IPv4 ' ) ,
mock . call . refresh_ipset_chain_by_name (
' IPv4fake_sgid' , [ ' 10.0.0.1 ' , ' 10.0.0.2 ' ] , ' IPv4 ' ) ,
mock . call . create_ipset_chain ( ' IPv6fake_sgid' , ' IPv6 ' ) ,
' NET IPv4fake_sgid' , [ ' 10.0.0.1 ' , ' 10.0.0.2 ' ] , ' IPv4 ' ) ,
mock . call . create_ipset_chain ( ' NET IPv6fake_sgid' , ' IPv6 ' ) ,
mock . call . refresh_ipset_chain_by_name (
' IPv6fake_sgid' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
' NET IPv6fake_sgid' , [ ' fe80::1 ' ] , ' IPv6 ' ) ]
self . firewall . ipset . assert_has_calls ( calls )