@ -1943,6 +1943,22 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
network_id = n [ ' network ' ] [ ' id ' ] ,
subnets = [ ] )
def test_router_add_gateway_notifications ( self ) :
call_count_total = 3
with self . router ( ) as r :
with self . network ( ) as n :
self . _set_net_external ( n [ ' network ' ] [ ' id ' ] )
with mock . patch . object ( registry , ' notify ' ) as notify :
self . _add_external_gateway_to_router (
r [ ' router ' ] [ ' id ' ] , n [ ' network ' ] [ ' id ' ] )
self . assertEqual ( call_count_total , notify . call_count )
expected = [ mock . call (
resources . ROUTER_GATEWAY ,
events . AFTER_CREATE , mock . ANY ,
context = mock . ANY , gw_ips = mock . ANY ,
network_id = mock . ANY , router_id = mock . ANY ) ]
notify . assert_has_calls ( expected )
def test_router_remove_interface_inuse_returns_409 ( self ) :
with self . router ( ) as r :
with self . subnet ( ) as s :
@ -3754,6 +3770,36 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
registry . unsubscribe ( fake_method , resources . FLOATING_IP ,
events . PRECOMMIT_CREATE )
def test_floatingip_delete_after_event ( self ) :
fake_method = mock . Mock ( )
try :
registry . subscribe ( fake_method , resources . FLOATING_IP ,
events . AFTER_DELETE )
with self . subnet ( cidr = ' 11.0.0.0/24 ' ) as public_sub :
self . _set_net_external ( public_sub [ ' subnet ' ] [ ' network_id ' ] )
f = self . _make_floatingip ( self . fmt ,
public_sub [ ' subnet ' ] [ ' network_id ' ] ,
port_id = None ,
fixed_ip = None ,
set_context = True )
self . _delete ( ' floatingips ' , f [ ' floatingip ' ] [ ' id ' ] )
fake_method . assert_called_once_with (
resources . FLOATING_IP , events . AFTER_DELETE , mock . ANY ,
context = mock . ANY , description = mock . ANY ,
dns_domain = mock . ANY , dns_name = mock . ANY ,
fixed_ip_address = f [ ' floatingip ' ] [ ' fixed_ip_address ' ] ,
floating_ip_address = f [ ' floatingip ' ] [ ' floating_ip_address ' ] ,
floating_network_id = f [ ' floatingip ' ] [ ' floating_network_id ' ] ,
id = f [ ' floatingip ' ] [ ' id ' ] ,
port_id = f [ ' floatingip ' ] [ ' port_id ' ] ,
project_id = f [ ' floatingip ' ] [ ' project_id ' ] ,
router_id = f [ ' floatingip ' ] [ ' router_id ' ] ,
status = f [ ' floatingip ' ] [ ' status ' ] ,
tenant_id = f [ ' floatingip ' ] [ ' tenant_id ' ] )
finally :
registry . unsubscribe ( fake_method , resources . FLOATING_IP ,
events . AFTER_DELETE )
def test_router_create_precommit_event ( self ) :
nset = lambda * a , * * k : setattr ( k [ ' router_db ' ] , ' name ' , ' hello ' )
registry . subscribe ( nset , resources . ROUTER , events . PRECOMMIT_CREATE )