@ -1793,36 +1793,36 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
self . conf . set_override ( ' router_id ' , ' 1234 ' )
self . _configure_metadata_proxy ( )
def test_process_routers_update_rpc_timeout_on_get_routers ( self ) :
def _test_process_routers_update_rpc_timeout ( self , ext_net_call = False ,
ext_net_call_failed = False ) :
agent = l3_agent . L3NATAgent ( HOSTNAME , self . conf )
agent . fullsync = False
agent . _process_router_if_compatible = mock . Mock ( )
self . plugin_api . get_routers . side_effect = (
oslo_messaging . MessagingTimeout )
if ext_net_call_failed :
agent . _process_router_if_compatible . side_effect = (
oslo_messaging . MessagingTimeout )
agent . _queue = mock . Mock ( )
agent . _resync_router = mock . Mock ( )
update = mock . Mock ( )
update . router = None
agent . _queue . each_update_to_next_router . side_effect = [
[ ( None , update ) ] ]
agent . _process_router_update ( )
self . assertTrue ( agent . fullsync )
self . assertFalse ( agent . _process_router_if_compatible . called )
self . assertFalse ( agent . fullsync )
self . assertEqual ( ext_net_call ,
agent . _process_router_if_compatible . called )
agent . _resync_router . assert_called_with ( update )
def test_process_routers_update_rpc_timeout_on_get_ext_net ( self ) :
agent = l3_agent . L3NATAgent ( HOSTNAME , self . conf )
agent . fullsync = False
agent . _process_router_if_compatible = mock . Mock ( )
agent . _process_router_if_compatible . side_effect = (
def test_process_routers_update_rpc_timeout_on_get_routers ( self ) :
self . plugin_api . get_routers . side_effect = (
oslo_messaging . MessagingTimeout )
agent . _queue = mock . Mock ( )
agent . _queue . each_update_to_next_router . side_effect = [
[ ( None , mock . Mock ( ) ) ] ]
self . _test_process_routers_update_rpc_timeout ( )
agent . _process_router_update ( )
self . assertTrue ( agent . fullsync )
def test_process_routers_update_rpc_timeout_on_get_ext_net ( self ) :
self . _test_process_routers_update_rpc_timeout ( ext_net_call = True ,
ext_net_call_failed = True )
def test_process_routers_update_router_deleted( self ) :
def _ test_process_routers_update_router_deleted( self , error = False ) :
agent = l3_agent . L3NATAgent ( HOSTNAME , self . conf )
agent . _queue = mock . Mock ( )
update = mock . Mock ( )
@ -1833,11 +1833,26 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
router_processor = mock . Mock ( )
agent . _queue . each_update_to_next_router . side_effect = [
[ ( router_processor , update ) ] ]
agent . _resync_router = mock . Mock ( )
if error :
agent . _safe_router_removed = mock . Mock ( )
agent . _safe_router_removed . return_value = False
agent . _process_router_update ( )
router_info . delete . assert_called_once_with ( agent )
self . assertFalse ( agent . router_info )
router_processor . fetched_and_processed . assert_called_once_with (
update . timestamp )
if error :
self . assertFalse ( router_processor . fetched_and_processed . called )
agent . _resync_router . assert_called_with ( update )
else :
router_info . delete . assert_called_once_with ( agent )
self . assertFalse ( agent . router_info )
self . assertFalse ( agent . _resync_router . called )
router_processor . fetched_and_processed . assert_called_once_with (
update . timestamp )
def test_process_routers_update_router_deleted_success ( self ) :
self . _test_process_routers_update_router_deleted ( )
def test_process_routers_update_router_deleted_error ( self ) :
self . _test_process_routers_update_router_deleted ( True )
def test_process_router_if_compatible_with_no_ext_net_in_conf ( self ) :
agent = l3_agent . L3NATAgent ( HOSTNAME , self . conf )