Use payloads for FloatingIP AFTER callbacks

Change FloatingIP AFTER_ notifications in l3_db to use publish calls.
Move association_event field into Payload metadata.

Closes-Bug: #1933502
Change-Id: Ie4c0f4a63a87c32026c49b03068e5f461deb38b6
This commit is contained in:
Szymon Wroblewski 2021-07-19 12:25:32 +02:00
parent b6bb4c0941
commit b488fb8e22
4 changed files with 134 additions and 61 deletions

View File

@ -187,8 +187,6 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
"deleting.", port_id)
self._core_plugin.delete_port(
context, port_id, l3_port_check=False)
registry.notify(resources.FLOATING_IP, events.AFTER_DELETE,
self, context=context, **fips[0])
def _get_dead_floating_port_candidates(self, context):
filters = {'device_id': ['PENDING'],
@ -1334,7 +1332,6 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
'floating_ip_address': floating_ip_address,
'floating_network_id': floatingip_obj.floating_network_id,
'floating_ip_id': floatingip_obj.id,
'context': context,
'association_event': association_event}
def _is_ipv4_network(self, context, net_db):
@ -1437,10 +1434,13 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
context.elevated(), external_port['id'],
{'port': {'device_id': fip_id,
'project_id': fip['tenant_id']}})
registry.notify(resources.FLOATING_IP,
events.AFTER_UPDATE,
self._update_fip_assoc,
**assoc_result)
registry.publish(
resources.FLOATING_IP, events.AFTER_CREATE, self,
payload=events.DBEventPayload(
context, states=(assoc_result,),
resource_id=floatingip_obj.id,
metadata={
'association_event': assoc_result['association_event']}))
if assoc_result['association_event']:
LOG.info(FIP_ASSOC_MSG,
{'fip_id': assoc_result['floating_ip_id'],
@ -1502,10 +1502,13 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
desired_state=floatingip_db,
states=(old_floatingip, floatingip)))
registry.notify(resources.FLOATING_IP,
events.AFTER_UPDATE,
self._update_fip_assoc,
**assoc_result)
registry.publish(
resources.FLOATING_IP, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(
context, states=(assoc_result,),
resource_id=floatingip_obj.id,
metadata={
'association_event': assoc_result['association_event']}))
if assoc_result['association_event'] is not None:
port_id = old_fixed_port_id or assoc_result['fixed_port_id']
assoc = ('associated' if assoc_result['association_event']
@ -1563,8 +1566,11 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
self._core_plugin.delete_port(context.elevated(),
floatingip.floating_port_id,
l3_port_check=False)
registry.notify(resources.FLOATING_IP, events.AFTER_DELETE,
self, context=context, **floatingip_dict)
registry.publish(
resources.FLOATING_IP, events.AFTER_DELETE, self,
payload=events.DBEventPayload(
context, states=(floatingip_dict,),
resource_id=id))
if floatingip.fixed_port_id:
LOG.info(FIP_ASSOC_MSG,
{'fip_id': floatingip.id,
@ -1701,18 +1707,23 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
'fixed_ip_address': None,
'fixed_port_id': None,
'router_id': None,
'floating_ip_address': fip.floating_ip_address,
'floating_ip_address': (
str(fip.floating_ip_address)
if fip.floating_ip_address else None),
'floating_network_id': fip.floating_network_id,
'floating_ip_id': fip.id,
'context': context,
'router_ids': router_ids,
'association_event': False,
}
# Process DNS record removal after committing the transaction
if self._is_dns_integration_supported:
self._process_dns_floatingip_delete(context, fip.to_dict())
registry.notify(resources.FLOATING_IP, events.AFTER_UPDATE, self,
**assoc_result)
registry.publish(
resources.FLOATING_IP, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(
context, states=(assoc_result,),
resource_id=fip.id,
metadata={'association_event': False}))
for fip in old_fips.values():
LOG.info(FIP_ASSOC_MSG,
{'fip_id': fip['id'],

View File

@ -447,24 +447,24 @@ class DVRResourceOperationHandler(object):
models_v2.Port.admin_state_up == True) # noqa
return query.all()
@registry.receives(resources.FLOATING_IP, [events.AFTER_UPDATE])
def _create_dvr_floating_gw_port(self, resource, event, trigger, context,
router_id, fixed_port_id, floating_ip_id,
floating_network_id, fixed_ip_address,
association_event, **kwargs):
@registry.receives(resources.FLOATING_IP, [events.AFTER_CREATE,
events.AFTER_UPDATE])
def _create_dvr_floating_gw_port(self, rtype, event, trigger, payload):
"""Create floating agent gw port for DVR.
Floating IP Agent gateway port will be created when a
floatingIP association happens.
"""
if association_event and router_id:
fip = payload.latest_state
context = payload.context
if payload.metadata['association_event'] and fip['router_id']:
admin_ctx = context.elevated()
router_dict = self.get_router(admin_ctx, router_id)
router_dict = self.get_router(admin_ctx, fip['router_id'])
# Check if distributed router and then create the
# FloatingIP agent gateway port
if router_dict.get('distributed'):
hostid = self._get_dvr_service_port_hostid(context,
fixed_port_id)
hostid = self._get_dvr_service_port_hostid(
context, fip['fixed_port_id'])
if hostid:
# FIXME (Swami): This FIP Agent Gateway port should be
# created only once and there should not be a duplicate
@ -473,7 +473,7 @@ class DVRResourceOperationHandler(object):
# existing flow.
fip_agent_port = (
self.create_fip_agent_gw_port_if_not_exists(
admin_ctx, floating_network_id, hostid))
admin_ctx, fip['floating_network_id'], hostid))
LOG.debug("FIP Agent gateway port: %s", fip_agent_port)
else:
# If not hostid check if the fixed ip provided has to
@ -481,7 +481,7 @@ class DVRResourceOperationHandler(object):
# port. Get the port_dict, inherit the service port host
# and device owner(if it does not exist).
port = self._core_plugin.get_port(
admin_ctx, fixed_port_id)
admin_ctx, fip['fixed_port_id'])
allowed_device_owners = (
n_utils.get_dvr_allowed_address_pair_device_owners())
# NOTE: We just need to deal with ports that do not
@ -493,7 +493,7 @@ class DVRResourceOperationHandler(object):
addr_pair_active_service_port_list = (
self._get_ports_for_allowed_address_pair_ip(
admin_ctx, port['network_id'],
fixed_ip_address))
fip['fixed_ip_address']))
if not addr_pair_active_service_port_list:
return
self._inherit_service_port_and_arp_update(

View File

@ -629,11 +629,16 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
grtr.return_value = router_db
vmp.return_value = 'my-host'
mvmp.return_value = 'my-future-host'
registry.notify(resources.FLOATING_IP, events.AFTER_UPDATE, self,
context=mock.Mock(), router_id=router_db['id'],
fixed_port_id=port['id'], floating_ip_id=fip['id'],
floating_network_id=fip['floating_network_id'],
fixed_ip_address='1.2.3.4', association_event=True)
registry.publish(
resources.FLOATING_IP, events.AFTER_CREATE, self,
payload=events.DBEventPayload(
context=mock.Mock(), states=(dict(
router_id=router_db['id'], fixed_port_id=port['id'],
floating_ip_id=fip['id'],
floating_network_id=fip['floating_network_id'],
fixed_ip_address='1.2.3.4'),),
resource_id=fip['id'],
metadata=dict(association_event=True)))
return c_fip
def test_create_floatingip_agent_gw_port_with_dvr_router(self):

View File

@ -2717,7 +2717,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
with self.floatingip_no_assoc(private_sub) as fip:
port_id = p['port']['id']
ip_address = p['port']['fixed_ips'][0]['ip_address']
with mock.patch.object(registry, 'notify') as notify:
with mock.patch.object(registry, 'publish') as publish:
body = self._update('floatingips',
fip['floatingip']['id'],
{'floatingip': {'port_id': port_id}})
@ -2726,18 +2726,25 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
fip_id = fip['floatingip']['id']
router_id = body['floatingip']['router_id']
body = self._show('routers', router_id)
notify.assert_any_call(resources.FLOATING_IP,
events.AFTER_UPDATE,
mock.ANY,
context=mock.ANY,
fixed_ip_address=ip_address,
fixed_port_id=port_id,
floating_ip_address=fip_addr,
floating_network_id=fip_network_id,
last_known_router_id=None,
floating_ip_id=fip_id,
router_id=router_id,
association_event=True)
publish.assert_any_call(
resources.FLOATING_IP, events.AFTER_UPDATE, mock.ANY,
payload=mock.ANY)
for _, call_args, call_kwargs in publish.mock_calls:
resource, event, trigger = call_args
if event == events.AFTER_UPDATE:
break
payload = call_kwargs['payload']
fip_dict = dict(
fixed_ip_address=ip_address,
fixed_port_id=port_id,
floating_ip_address=fip_addr,
floating_network_id=fip_network_id,
last_known_router_id=None,
floating_ip_id=fip_id,
router_id=router_id,
association_event=True)
self.assertDictEqual(fip_dict, payload.latest_state)
self.assertTrue(payload.metadata['association_event'])
def test_floatingip_disassociate_notification(self):
with self.port() as p:
@ -2748,7 +2755,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
body = self._update('floatingips',
fip['floatingip']['id'],
{'floatingip': {'port_id': port_id}})
with mock.patch.object(registry, 'notify') as notify:
with mock.patch.object(registry, 'publish') as publish:
fip_addr = fip['floatingip']['floating_ip_address']
fip_network_id = fip['floatingip']['floating_network_id']
fip_id = fip['floatingip']['id']
@ -2756,18 +2763,60 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self._update('floatingips',
fip['floatingip']['id'],
{'floatingip': {'port_id': None}})
notify.assert_any_call(resources.FLOATING_IP,
events.AFTER_UPDATE,
mock.ANY,
context=mock.ANY,
fixed_ip_address=None,
fixed_port_id=None,
floating_ip_address=fip_addr,
floating_network_id=fip_network_id,
last_known_router_id=router_id,
floating_ip_id=fip_id,
router_id=None,
association_event=False)
publish.assert_any_call(
resources.FLOATING_IP, events.AFTER_UPDATE, mock.ANY,
payload=mock.ANY)
for _, call_args, call_kwargs in publish.mock_calls:
resource, event, trigger = call_args
if event == events.AFTER_UPDATE:
break
payload = call_kwargs['payload']
fip_dict = dict(
fixed_ip_address=None,
fixed_port_id=None,
floating_ip_address=fip_addr,
floating_network_id=fip_network_id,
last_known_router_id=router_id,
floating_ip_id=fip_id,
router_id=None,
association_event=False)
self.assertDictEqual(fip_dict, payload.latest_state)
self.assertFalse(payload.metadata['association_event'])
def test_floatingip_disassociate_notification_port_delete(self):
with self.port() as p:
private_sub = {'subnet': {'id':
p['port']['fixed_ips'][0]['subnet_id']}}
with self.floatingip_no_assoc(private_sub) as fip:
port_id = p['port']['id']
body = self._update('floatingips',
fip['floatingip']['id'],
{'floatingip': {'port_id': port_id}})
with mock.patch.object(registry, 'publish') as publish:
fip_id = fip['floatingip']['id']
fip_address = fip['floatingip']['floating_ip_address']
f_network_id = fip['floatingip']['floating_network_id']
router_id = body['floatingip']['router_id']
self._delete('ports', p['port']['id'])
publish.assert_any_call(
resources.FLOATING_IP, events.AFTER_UPDATE, mock.ANY,
payload=mock.ANY)
for _, call_args, call_kwargs in publish.mock_calls:
resource, event, trigger = call_args
if event == events.AFTER_UPDATE:
break
payload = call_kwargs['payload']
fip_dict = dict(
fixed_ip_address=None,
fixed_port_id=None,
router_id=None,
floating_ip_address=fip_address,
floating_network_id=f_network_id,
floating_ip_id=fip_id,
router_ids={router_id, },
association_event=False)
self.assertDictEqual(fip_dict, payload.latest_state)
self.assertFalse(payload.metadata['association_event'])
def test_floatingip_association_on_unowned_router(self):
# create a router owned by one tenant and associate the FIP with a
@ -4027,7 +4076,14 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
self._delete('floatingips', f['floatingip']['id'])
fake_method.assert_called_once_with(
resources.FLOATING_IP, events.AFTER_DELETE, mock.ANY,
context=mock.ANY, description=mock.ANY,
payload=mock.ANY)
for _, call_args, call_kwargs in fake_method.mock_calls:
resource, event, trigger = call_args
if event == events.AFTER_DELETE:
break
payload = call_kwargs['payload']
fip_dict = dict(
description=mock.ANY,
dns_domain=mock.ANY, dns_name=mock.ANY,
fixed_ip_address=f['floatingip']['fixed_ip_address'],
floating_ip_address=f['floatingip']['floating_ip_address'],
@ -4039,6 +4095,7 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
status=f['floatingip']['status'],
tenant_id=f['floatingip']['tenant_id'],
standard_attr_id=mock.ANY)
self.assertDictEqual(fip_dict, payload.latest_state)
finally:
registry.unsubscribe(fake_method, resources.FLOATING_IP,
events.AFTER_DELETE)