@ -71,6 +71,14 @@ class TestDvrRouter(framework.L3AgentTestFramework):
def test_dvr_router_lifecycle_ha_with_snat_with_fips ( self ) :
self . _dvr_router_lifecycle ( enable_ha = True , enable_snat = True )
def test_dvr_lifecycle_no_ha_with_snat_with_fips_with_cent_fips ( self ) :
self . _dvr_router_lifecycle ( enable_ha = False , enable_snat = True ,
snat_bound_fip = True )
def test_dvr_lifecycle_ha_with_snat_with_fips_with_cent_fips ( self ) :
self . _dvr_router_lifecycle ( enable_ha = True , enable_snat = True ,
snat_bound_fip = True )
def _helper_create_dvr_router_fips_for_ext_network (
self , agent_mode , * * dvr_router_kwargs ) :
self . agent . conf . agent_mode = agent_mode
@ -408,7 +416,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
def _dvr_router_lifecycle ( self , enable_ha = False , enable_snat = False ,
custom_mtu = 2000 ,
ip_version = lib_constants . IP_VERSION_4 ,
dual_stack = False ) :
dual_stack = False ,
snat_bound_fip = False ) :
''' Test dvr router lifecycle
: param enable_ha : sets the ha value for the router .
@ -423,7 +432,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
# We get the router info particular to a dvr router
router_info = self . generate_dvr_router_info (
enable_ha , enable_snat , extra_routes = True )
enable_ha , enable_snat , extra_routes = True ,
snat_bound_fip = snat_bound_fip )
for key in ( ' _interfaces ' , ' _snat_router_interfaces ' ,
' _floatingip_agent_interfaces ' ) :
for port in router_info [ key ] :
@ -480,9 +490,9 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self . _assert_internal_devices ( router )
self . _assert_dvr_external_device ( router )
self . _assert_dvr_gateway ( router )
self . _assert_dvr_floating_ips ( router )
self . _assert_dvr_floating_ips ( router , snat_bound_fip = snat_bound_fip )
self . _assert_snat_chains ( router )
self . _assert_floating_ip_chains ( router )
self . _assert_floating_ip_chains ( router , snat_bound_fip = snat_bound_fip )
self . _assert_metadata_chains ( router )
self . _assert_rfp_fpr_mtu ( router , custom_mtu )
if enable_snat :
@ -527,21 +537,33 @@ class TestDvrRouter(framework.L3AgentTestFramework):
extra_routes = extra_routes ,
num_internal_ports = 2 ,
enable_gw = enable_gw ,
snat_bound_fip = snat_bound_fip ,
* * kwargs )
internal_ports = router . get ( lib_constants . INTERFACE_KEY , [ ] )
router [ ' distributed ' ] = True
router [ ' gw_port_host ' ] = agent . conf . host
if enable_floating_ip :
floating_ip = router [ ' _floatingips ' ] [ 0 ]
for floating_ip in router [ lib_constants . FLOATINGIP_KEY ] :
floating_ip [ ' host ' ] = agent . conf . host
if snat_bound_fip :
floating_ip [ lib_constants . DVR_SNAT_BOUND ] = True
if enable_floating_ip and enable_centralized_fip :
# For centralizing the fip, we are emulating the legacy
# router behavior were the fip dict does not contain any
# host information.
floating_ip [ ' host ' ] = None
router [ lib_constants . FLOATINGIP_KEY ] [ 0 ] [ ' host ' ] = None
# In order to test the mixed dvr_snat and compute scenario, we create
# two floating IPs, one is distributed, another is centralized.
# The distributed floating IP should have the host, which was
# just set to None above, then we set it back. The centralized
# floating IP has host None, and this IP will be used to test
# migration from centralized to distributed.
if snat_bound_fip :
router [ lib_constants . FLOATINGIP_KEY ] [ 0 ] [ ' host ' ] = agent . conf . host
router [ lib_constants . FLOATINGIP_KEY ] [ 1 ] [
lib_constants . DVR_SNAT_BOUND ] = True
router [ lib_constants . FLOATINGIP_KEY ] [ 1 ] [ ' host ' ] = None
if enable_gw :
external_gw_port = router [ ' gw_port ' ]
router [ ' gw_port ' ] [ portbindings . HOST_ID ] = agent . conf . host
@ -551,11 +573,11 @@ class TestDvrRouter(framework.L3AgentTestFramework):
# the agent type the dvr supports. The namespace creation is
# dependent on the agent_type.
if enable_floating_ip :
floating_ip = router [ ' _floatingips ' ] [ 0 ]
floating_ip [ ' floating_network_id ' ] = (
external_gw_port [ ' network_id ' ] )
floating_ip [ ' port_id ' ] = internal_ports [ 0 ] [ ' id ' ]
floating_ip [ ' status ' ] = ' ACTIVE '
for index , floating_ip in enumerate ( router [ ' _floatingips ' ] ) :
floating_ip [ ' floating_network_id ' ] = (
external_gw_port [ ' network_id ' ] )
floating_ip [ ' port_id ' ] = internal_ports [ index ] [ ' id ' ]
floating_ip [ ' status ' ] = ' ACTIVE '
self . _add_fip_agent_gw_port_info_to_router ( router ,
external_gw_port )
@ -723,7 +745,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
router . router_id )
self . assertFalse ( self . _namespace_exists ( namespace ) )
def _assert_dvr_floating_ips ( self , router ):
def _assert_dvr_floating_ips ( self , router , snat_bound_fip = False ):
# in the fip namespace:
# Check that the fg-<port-id> (floatingip_agent_gateway)
# is created with the ip address of the external gateway port
@ -761,8 +783,12 @@ class TestDvrRouter(framework.L3AgentTestFramework):
# correctly
for fip in floating_ips :
expected_rules = router . floating_forward_rules ( fip )
if fip . get ( lib_constants . DVR_SNAT_BOUND ) :
iptables_mgr = router . snat_iptables_manager
else :
iptables_mgr = router . iptables_manager
self . _assert_iptables_rules_exist (
router . iptables_manager , ' nat ' , expected_rules )
iptables_mgr, ' nat ' , expected_rules )
def test_dvr_router_with_ha_for_fip_disassociation ( self ) :
""" Test to validate the fip rules are deleted in dvr_snat_ha router.
@ -890,6 +916,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
for rule in expected_rules :
self . assertIn (
str ( iptables_manager . IptablesRule ( rule [ 0 ] , rule [ 1 ] ) ) , rules )
return True
def test_prevent_snat_rule_exist_on_restarted_agent ( self ) :
self . agent . conf . agent_mode = ' dvr_snat '
@ -934,8 +961,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
# existing ports on the uplinked subnet, the ARP
# cache is properly populated.
self . agent . conf . agent_mode = ' dvr_snat '
router_info = l3_test_common . prepare_router_data ( )
router_info [ ' distributed ' ] = True
router_info = self . generate_dvr_router_info ( enable_snat = True )
expected_neighbor = ' 35.4.1.10 '
port_data = {
' fixed_ips ' : [ { ' ip_address ' : expected_neighbor } ] ,
@ -1082,8 +1108,9 @@ class TestDvrRouter(framework.L3AgentTestFramework):
# In the snat namespace, check the iptables rules are set correctly
for fip in snat_bound_floatingips :
expected_rules = router1 . floating_forward_rules ( fip )
self . _assert_iptables_rules_exist (
router1 . snat_iptables_manager , ' nat ' , expected_rules )
if fip . get ( lib_constants . DVR_SNAT_BOUND ) :
self . _assert_iptables_rules_exist (
router1 . snat_iptables_manager , ' nat ' , expected_rules )
def test_floating_ip_migration_from_unbound_to_bound ( self ) :
""" Test to check floating ips migrate from unboun to bound host. """
@ -1092,33 +1119,51 @@ class TestDvrRouter(framework.L3AgentTestFramework):
enable_floating_ip = True , enable_centralized_fip = True ,
enable_snat = True , snat_bound_fip = True )
router1 = self . manage_router ( self . agent , router_info )
centralized_floatingips = router_info [ lib_constants . FLOATINGIP_KEY ]
floatingips = router_info [ lib_constants . FLOATINGIP_KEY ]
distributed_fip = floatingips [ 0 ]
centralized_floatingip = floatingips [ 1 ]
# For private ports hosted in dvr_no_fip agent, the floatingip
# dict will contain the fip['host'] key, but the value will always
# be None to emulate the legacy router.
self . assertIsNone ( centralized_floatingip s[ 0 ] [ ' host ' ] )
self . assertIsNone ( centralized_floatingip [ ' host ' ] )
self . assertTrue ( self . _namespace_exists ( router1 . ns_name ) )
fip_ns = router1 . fip_ns . get_name ( )
self . assertTrue ( self . _namespace_exists ( fip_ns ) )
self . _assert_snat_namespace_exists ( router1 )
# If fips are centralized then, the DNAT rules are only
# configured in the SNAT Namespace and not in the router-ns.
router_ns = router1 . ns_name
fixed_ip = centralized_floatingips [ 0 ] [ ' fixed_ip_address ' ]
for fip in centralized_floatingips :
expected_rules = router1 . floating_forward_rules ( fip )
self . assertFalse ( self . _assert_iptables_rules_exist (
router1 . snat_iptables_manager , ' nat ' , expected_rules ) )
self . assertFalse ( self . _fixed_ip_rule_exists ( router_ns , fixed_ip ) )
# Now let us edit the floatingIP info with 'host' and remove
# the 'dvr_snat_bound'
router1 . router [ lib_constants . FLOATINGIP_KEY ] [ 0 ] [ ' host ' ] = (
expected_rules = router1 . floating_forward_rules ( distributed_fip )
self . assertTrue ( self . _assert_iptables_rules_exist (
router1 . iptables_manager , ' nat ' , expected_rules ) )
expected_rules = router1 . _centralized_floating_forward_rules (
centralized_floatingip [ ' floating_ip_address ' ] ,
centralized_floatingip [ ' fixed_ip_address ' ] )
self . assertTrue ( self . _assert_iptables_rules_exist (
router1 . snat_iptables_manager , ' nat ' , expected_rules ) )
qrouter_ns = router1 . ns_name
fixed_ip_dist = distributed_fip [ ' fixed_ip_address ' ]
snat_ns = router1 . snat_namespace . name
fixed_ip_cent = centralized_floatingip [ ' fixed_ip_address ' ]
self . assertFalse ( self . _fixed_ip_rule_exists ( qrouter_ns , fixed_ip_cent ) )
self . assertTrue ( self . _fixed_ip_rule_exists ( qrouter_ns , fixed_ip_dist ) )
self . assertFalse ( self . _fixed_ip_rule_exists ( snat_ns , fixed_ip_dist ) )
self . assertFalse ( self . _fixed_ip_rule_exists ( snat_ns , fixed_ip_cent ) )
# Now let us edit the centralized floatingIP info with 'host'
# and remove the 'dvr_snat_bound'
router1 . router [ lib_constants . FLOATINGIP_KEY ] [ 1 ] [ ' host ' ] = (
self . agent . conf . host )
del router1 . router [ lib_constants . FLOATINGIP_KEY ] [ 0 ] [ ' dvr_snat_bound ' ]
del router1 . router [ lib_constants . FLOATINGIP_KEY ] [ 1 ] [ ' dvr_snat_bound ' ]
self . agent . _process_updated_router ( router1 . router )
router_updated = self . agent . router_info [ router_info [ ' id ' ] ]
router_ns = router_updated . ns_name
self . assertTrue ( self . _fixed_ip_rule_exists ( router_ns , fixed_ip ) )
qrouter_ns = router_updated . ns_name
fixed_ip_dist = distributed_fip [ ' fixed_ip_address ' ]
snat_ns = router_updated . snat_namespace . name
fixed_ip_cent = centralized_floatingip [ ' fixed_ip_address ' ]
self . assertTrue ( self . _fixed_ip_rule_exists ( qrouter_ns , fixed_ip_dist ) )
self . assertFalse ( self . _fixed_ip_rule_exists ( snat_ns , fixed_ip_dist ) )
self . assertTrue ( self . _fixed_ip_rule_exists ( qrouter_ns , fixed_ip_cent ) )
self . assertFalse ( self . _fixed_ip_rule_exists ( snat_ns , fixed_ip_cent ) )
self . assertTrue ( self . _namespace_exists ( fip_ns ) )
def test_floating_ip_not_deployed_on_dvr_no_external_agent ( self ) :
@ -1140,8 +1185,7 @@ class TestDvrRouter(framework.L3AgentTestFramework):
# configured in the SNAT Namespace and not in the router-ns.
for fip in centralized_floatingips :
expected_rules = router1 . floating_forward_rules ( fip )
self . assertFalse ( self . _assert_iptables_rules_exist (
router1 . iptables_manager , ' nat ' , expected_rules ) )
self . assertEqual ( 0 , len ( expected_rules ) )
def test_floating_ip_create_does_not_raise_keyerror_on_missing_host ( self ) :
""" Test to check floating ips configure does not raise Keyerror. """