@ -115,6 +115,64 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self . assertEqual ( 4 , default_rules_list_count )
self . assertEqual ( 2 , interface_rules_list_count )
def test_dvr_update_gateway_port_no_fip_fg_port_recovers_itself_with_fpr (
self ) :
self . agent . conf . agent_mode = ' dvr '
# Create the router with external net
router_info = self . generate_dvr_router_info ( )
external_gw_port = router_info [ ' gw_port ' ]
ext_net_id = router_info [ ' _floatingips ' ] [ 0 ] [ ' floating_network_id ' ]
self . mock_plugin_api . get_external_network_id . return_value = ext_net_id
router = self . manage_router ( self . agent , router_info )
fg_port = router . fip_ns . agent_gateway_port
fg_port_name = router . fip_ns . get_ext_device_name ( fg_port [ ' id ' ] )
fg_device = ip_lib . IPDevice ( fg_port_name ,
namespace = router . fip_ns . name )
fip_2_rtr_name = router . fip_ns . get_int_device_name ( router . router_id )
fpr_device = ip_lib . IPDevice ( fip_2_rtr_name ,
namespace = router . fip_ns . name )
# Now validate if the gateway is properly configured.
rtr_2_fip , fip_2_rtr = router . rtr_fip_subnet . get_pair ( )
tbl_index = router . _get_snat_idx ( fip_2_rtr )
tbl_filter = [ ' table ' , tbl_index ]
self . assertIn ( ' gateway ' , fg_device . route . get_gateway (
filters = tbl_filter ) )
self . _validate_fips_for_external_network (
router , router . fip_ns . get_name ( ) )
# Now delete the fg- port that was created
ext_net_bridge = self . agent . conf . external_network_bridge
router . fip_ns . driver . unplug ( fg_port_name ,
bridge = ext_net_bridge ,
namespace = router . fip_ns . name ,
prefix = dvr_fip_ns . FIP_EXT_DEV_PREFIX )
# Now check if the fg- port is missing.
self . assertFalse ( fg_device . exists ( ) )
fpr_device . link . set_down ( )
# Now change the gateway ip for the router and do an update.
router . ex_gw_port = copy . deepcopy ( router . ex_gw_port )
new_fg_port = copy . deepcopy ( fg_port )
for subnet in new_fg_port [ ' subnets ' ] :
subnet [ ' gateway_ip ' ] = ' 19.4.4.2 '
router . router [ n_const . FLOATINGIP_AGENT_INTF_KEY ] = [ new_fg_port ]
self . assertRaises ( n_exc . FloatingIpSetupException ,
self . agent . _process_updated_router ,
router . router )
self . agent . _process_updated_router ( router . router )
self . assertTrue ( fg_device . exists ( ) )
self . assertTrue ( fpr_device . exists ( ) )
updated_route = fg_device . route . list_routes (
ip_version = lib_constants . IP_VERSION_4 ,
table = tbl_index )
expected_route = [ { ' cidr ' : ' 0.0.0.0/0 ' ,
' dev ' : fg_port_name ,
' table ' : tbl_index ,
u ' via ' : u ' 19.4.4.2 ' } ]
self . assertEqual ( expected_route , updated_route )
self . _validate_fips_for_external_network (
router , router . fip_ns . get_name ( ) )
self . _delete_router ( self . agent , router . router_id )
self . _assert_fip_namespace_deleted ( external_gw_port )
def test_dvr_update_gateway_port_with_no_gw_port_in_namespace ( self ) :
self . agent . conf . agent_mode = ' dvr '