use payloads for ROUTER_INTERFACE events

This patch switches the code over to the payload style of callbacks [1]
for ROUTER_INTERFACE events for those that are not using them yet.
The unit tests are also updated where needed to account for the
payload style callbacks and publish() method. In addition, a few
callback methods that use the retry_if_session_inactive() decorator are
separated out from the callback so that the context can still be
passed and detected by retry_if_session_inactive logic.

NeutronLibImpact

[1]
https://docs.openstack.org/neutron-lib/latest/contributor/callbacks.html

Change-Id: I8d9f8296952dfb10fcccd6afd72e90a5d4f379eb
This commit is contained in:
Nurmatov Mamatisa 2021-05-22 07:13:38 +03:00 committed by Mamatisa Nurmatov
parent 7e98d18927
commit 4ab699e5cd
7 changed files with 152 additions and 127 deletions

View File

@ -274,17 +274,18 @@ class DhcpAgentNotifyAPI(object):
{'admin_state_up': admin_state_up}, host)
def _after_router_interface_created(self, resource, event, trigger,
**kwargs):
self._notify_agents(kwargs['context'], 'port_create_end',
{'port': kwargs['port']},
kwargs['port']['network_id'])
payload=None):
port = payload.metadata.get('port')
self._notify_agents(payload.context, 'port_create_end',
{'port': port},
port['network_id'])
def _after_router_interface_deleted(self, resource, event, trigger,
**kwargs):
self._notify_agents(kwargs['context'], 'port_delete_end',
{'port_id': kwargs['port']['id'],
'fixed_ips': kwargs['port']['fixed_ips']},
kwargs['port']['network_id'])
payload=None):
port = payload.metadata.get('port')
self._notify_agents(payload.context, 'port_delete_end',
{'port_id': port['id']},
port['network_id'])
def _native_event_send_dhcp_notification_payload(
self, resource, event, trigger, payload=None):

View File

@ -747,15 +747,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
prevent the port to be attach to the router.
"""
try:
registry.notify(resources.ROUTER_INTERFACE,
events.BEFORE_CREATE,
self,
context=context,
router_db=router_db,
port=port,
interface_info=interface_info,
router_id=router_db.id,
network_id=port['network_id'])
metadata = {
'port': port, 'interface_info': interface_info,
'network_id': port['network_id']}
registry.publish(resources.ROUTER_INTERFACE,
events.BEFORE_CREATE, self,
payload=events.DBEventPayload(
context, states=(router_db,),
metadata=metadata,
resource_id=router_db.id))
except exceptions.CallbackFailure as e:
# raise the underlying exception
reason = (_('cannot perform router interface attachment '
@ -888,19 +888,20 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
gw_network_id = router.gw_port.network_id
gw_ips = [x['ip_address'] for x in router.gw_port.fixed_ips]
registry.notify(resources.ROUTER_INTERFACE,
events.AFTER_CREATE,
self,
context=context,
network_id=gw_network_id,
gateway_ips=gw_ips,
cidrs=[x['cidr'] for x in subnets],
subnets=subnets,
port_id=port['id'],
router_id=router_id,
port=port,
new_interface=new_router_intf,
interface_info=interface_info)
cidrs = [x['cidr'] for x in subnets]
metadata = {'interface_info': interface_info,
'new_interface': new_router_intf,
'port': port,
'subnets': subnets,
'cidrs': cidrs,
'gateway_ips': gw_ips,
'network_id': gw_network_id}
registry.publish(resources.ROUTER_INTERFACE,
events.AFTER_CREATE, self,
payload=events.DBEventPayload(
context, metadata=metadata,
states=(router,),
resource_id=router_id))
return self._make_router_interface_info(
router.id, port['tenant_id'], port['id'], port['network_id'],
@ -1040,16 +1041,16 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
gw_network_id = router.gw_port.network_id
gw_ips = [x['ip_address'] for x in router.gw_port.fixed_ips]
registry.notify(resources.ROUTER_INTERFACE,
events.AFTER_DELETE,
self,
context=context,
cidrs=[x['cidr'] for x in subnets],
network_id=gw_network_id,
gateway_ips=gw_ips,
port=port,
router_id=router_id,
interface_info=interface_info)
cidrs = [x['cidr'] for x in subnets]
metadata = {'interface_info': interface_info,
'port': port, 'gateway_ips': gw_ips,
'network_id': gw_network_id, 'cidrs': cidrs}
registry.publish(resources.ROUTER_INTERFACE,
events.AFTER_DELETE, self,
payload=events.DBEventPayload(
context, metadata=metadata,
resource_id=router_id))
return self._make_router_interface_info(router_id, port['tenant_id'],
port['id'], port['network_id'],
subnets[0]['id'],

View File

@ -497,30 +497,34 @@ class DVRResourceOperationHandler(object):
self.update_arp_entry_for_dvr_service_port(context,
service_port_dict)
@registry.receives(resources.ROUTER_INTERFACE, [events.BEFORE_CREATE])
@db_api.retry_if_session_inactive()
def _add_csnat_on_interface_create(self, resource, event, trigger,
context, router_db, port, **kwargs):
"""Event handler to for csnat port creation on interface creation."""
if not router_db.extra_attributes.distributed or not router_db.gw_port:
def _retry_add_csnat_on_interface_create(self, context, payload):
router_db = payload.latest_state
if not router_db.extra_attributes.distributed or not \
router_db.gw_port:
return
admin_context = n_utils.get_elevated_context(context)
admin_context = payload.context.elevated()
port = payload.metadata.get('port')
self._add_csnat_router_interface_port(
admin_context, router_db, port['network_id'],
admin_context, payload.latest_state,
port['network_id'],
[{'subnet_id': port['fixed_ips'][-1]['subnet_id']}])
@registry.receives(resources.ROUTER_INTERFACE, [events.AFTER_CREATE])
@registry.receives(resources.ROUTER_INTERFACE, [events.BEFORE_CREATE])
def _add_csnat_on_interface_create(self, resource, event, trigger,
payload=None):
"""Event handler to for csnat port creation on interface creation."""
return self._retry_add_csnat_on_interface_create(
payload.context, payload)
@db_api.retry_if_session_inactive()
def _update_snat_v6_addrs_after_intf_update(self, resource, event, trigger,
context, subnets, port,
router_id, new_interface,
**kwargs):
if new_interface:
def _retry_update_snat_v6_addrs_after_intf_update(self, context, payload):
if payload.metadata.get('new_interface'):
# _add_csnat_on_interface_create handler deals with new ports
return
# if not a new interface, the interface was added to a new subnet,
# which is the first in this list
subnet = subnets[0]
subnet = payload.metadata.get('subnets')[0]
if not subnet or subnet['ip_version'] != 6:
return
# NOTE: For IPv6 additional subnets added to the same
@ -528,8 +532,8 @@ class DVRResourceOperationHandler(object):
# IPv6 subnet
# Add new prefix to an existing ipv6 csnat port with the
# same network id if one exists
admin_ctx = n_utils.get_elevated_context(context)
router = self.l3plugin._get_router(admin_ctx, router_id)
admin_ctx = payload.context.elevated()
router = self.l3plugin._get_router(admin_ctx, payload.resource_id)
cs_port = self._find_v6_router_port_by_network_and_device_owner(
router, subnet['network_id'], const.DEVICE_OWNER_ROUTER_SNAT)
if not cs_port:
@ -537,9 +541,11 @@ class DVRResourceOperationHandler(object):
new_fixed_ip = {'subnet_id': subnet['id']}
fixed_ips = list(cs_port['fixed_ips'])
fixed_ips.append(new_fixed_ip)
port = payload.metadata.get('port')
try:
updated_port = self._core_plugin.update_port(
admin_ctx, cs_port['id'], {'port': {'fixed_ips': fixed_ips}})
admin_ctx, cs_port['id'],
{'port': {'fixed_ips': fixed_ips}})
except Exception:
with excutils.save_and_reraise_exception():
# we need to try to undo the updated router
@ -557,10 +563,11 @@ class DVRResourceOperationHandler(object):
# future with a compare-and-swap style update
# using the revision number of the port.
p = self._core_plugin.get_port(admin_ctx, port['id'])
rollback_fixed_ips = [ip for ip in p['fixed_ips']
if ip['subnet_id'] != subnet['id']]
upd = {'port': {'fixed_ips': rollback_fixed_ips}}
self._core_plugin.update_port(admin_ctx, port['id'], upd)
fixed_ips = [ip for ip in p['fixed_ips']
if ip['subnet_id'] != subnet['id']]
upd = {'port': {'fixed_ips': fixed_ips}}
self._core_plugin.update_port(
admin_ctx, port['id'], upd)
try:
revert()
except Exception:
@ -569,8 +576,14 @@ class DVRResourceOperationHandler(object):
port['id'])
LOG.debug("CSNAT port updated for IPv6 subnet: %s", updated_port)
def _find_v6_router_port_by_network_and_device_owner(self, router, net_id,
device_owner):
@registry.receives(resources.ROUTER_INTERFACE, [events.AFTER_CREATE])
def _update_snat_v6_addrs_after_intf_update(self, resource, event, trigger,
payload=None):
return self._retry_update_snat_v6_addrs_after_intf_update(
payload.context, payload)
def _find_v6_router_port_by_network_and_device_owner(
self, router, net_id, device_owner):
for port in router.attached_ports:
p = port['port']
if (p['network_id'] == net_id and
@ -635,13 +648,11 @@ class DVRResourceOperationHandler(object):
self.related_dvr_router_routers[cache_key] = (
existing_routers | other_routers)
@registry.receives(resources.ROUTER_INTERFACE, [events.AFTER_DELETE])
@db_api.retry_if_session_inactive()
def _cleanup_after_interface_removal(self, resource, event, trigger,
context, port, interface_info,
router_id, **kwargs):
"""Handler to cleanup distributed resources after intf removal."""
router = self.l3plugin._get_router(context, router_id)
def _retry_cleanup_after_interface_removal(self, context, payload):
context = payload.context
port = payload.metadata.get('port')
router = self.l3plugin._get_router(context, payload.resource_id)
if not router.extra_attributes.distributed:
return
@ -649,29 +660,31 @@ class DVRResourceOperationHandler(object):
# the removed port's subnets and then subtract out any hosts still
# hosting the router for the remaining interfaces
router_hosts_for_removed = self.l3plugin._get_dvr_hosts_for_subnets(
context, subnet_ids={ip['subnet_id'] for ip in port['fixed_ips']})
context,
subnet_ids={ip['subnet_id'] for ip in port['fixed_ips']})
router_hosts_after = self.l3plugin._get_dvr_hosts_for_router(
context, router_id)
context, payload.resource_id)
removed_hosts = set(router_hosts_for_removed) - set(router_hosts_after)
if removed_hosts:
# Get hosts where this router is placed as "related" to other dvr
# routers and don't remove it from such hosts
related_hosts = self._get_other_dvr_hosts(context, router_id)
related_hosts = self._get_other_dvr_hosts(context,
payload.resource_id)
agents = self.l3plugin.get_l3_agents(
context, filters={'host': removed_hosts})
bindings = rb_obj.RouterL3AgentBinding.get_objects(
context, router_id=router_id)
context, router_id=payload.resource_id)
snat_binding = bindings.pop() if bindings else None
connected_dvr_routers = set(
self.l3plugin._get_other_dvr_router_ids_connected_router(
context, router_id))
context, payload.resource_id))
for agent in agents:
is_this_snat_agent = (
snat_binding and snat_binding.l3_agent_id == agent['id'])
if (not is_this_snat_agent and
agent['host'] not in related_hosts):
self.l3plugin.l3_rpc_notifier.router_removed_from_agent(
context, router_id, agent['host'])
context, payload.resource_id, agent['host'])
for connected_router_id in connected_dvr_routers:
connected_router_hosts = set(
self.l3plugin._get_dvr_hosts_for_router(
@ -680,17 +693,18 @@ class DVRResourceOperationHandler(object):
self._get_other_dvr_hosts(
context, connected_router_id))
if agent['host'] not in connected_router_hosts:
self.l3plugin.l3_rpc_notifier.\
self.l3plugin.l3_rpc_notifier. \
router_removed_from_agent(
context, connected_router_id,
agent['host'])
# if subnet_id not in interface_info, request was to remove by port
interface_info = payload.metadata.get('interface_info')
sub_id = (interface_info.get('subnet_id') or
port['fixed_ips'][0]['subnet_id'])
self._cleanup_related_hosts_after_interface_removal(
context, router_id, sub_id)
context, payload.resource_id, sub_id)
self._cleanup_related_routers_after_interface_removal(
context, router_id, sub_id)
context, payload.resource_id, sub_id)
is_multiple_prefix_csport = (
self._check_for_multiprefix_csnat_port_and_update(
context, router, port['network_id'], sub_id))
@ -700,6 +714,13 @@ class DVRResourceOperationHandler(object):
n_utils.get_elevated_context(context),
router, subnet_id=sub_id)
@registry.receives(resources.ROUTER_INTERFACE, [events.AFTER_DELETE])
def _cleanup_after_interface_removal(self, resource, event, trigger,
payload=None):
"""Handler to cleanup distributed resources after intf removal."""
return self._retry_cleanup_after_interface_removal(
payload.context, payload)
def _cleanup_related_hosts_after_interface_removal(
self, context, router_id, subnet_id):
router_hosts = self.l3plugin._get_dvr_hosts_for_router(

View File

@ -1970,27 +1970,22 @@ class L3DvrTestCase(L3DvrTestCaseBase):
interface_info = {'subnet_id': subnet['subnet']['id']}
self.l3_plugin.add_router_interface(
self.context, router['id'], interface_info)
kwargs = {'context': self.context, 'router_id': router['id'],
'network_id': net['network']['id'],
'router_db': mock.ANY,
'port': mock.ANY,
'interface_info': interface_info}
notif_handler_before.callback.assert_called_once_with(
resources.ROUTER_INTERFACE, events.BEFORE_CREATE,
mock.ANY, **kwargs)
kwargs_after = {'cidrs': mock.ANY,
'context': mock.ANY,
'gateway_ips': mock.ANY,
'interface_info': mock.ANY,
'network_id': None,
'port': mock.ANY,
'new_interface': True,
'subnets': mock.ANY,
'port_id': mock.ANY,
'router_id': router['id']}
mock.ANY, payload=mock.ANY)
payload = notif_handler_before.mock_calls[0][2]['payload']
self.assertEqual(self.context, payload.context)
self.assertEqual(router['id'], payload.resource_id)
self.assertEqual(net['network']['id'],
payload.metadata.get('network_id'))
self.assertEqual(interface_info,
payload.metadata.get('interface_info'))
notif_handler_after.callback.assert_called_once_with(
resources.ROUTER_INTERFACE, events.AFTER_CREATE,
mock.ANY, **kwargs_after)
mock.ANY, payload=mock.ANY)
payload = notif_handler_before.mock_calls[0][2]['payload']
self.assertEqual(router['id'], payload.resource_id)
def test_add_router_interface_by_port_notifications(self):
notif_handler_before = mock.Mock()
@ -2007,28 +2002,26 @@ class L3DvrTestCase(L3DvrTestCaseBase):
self.port(subnet=subnet) as port:
interface_info = {'port_id': port['port']['id']}
self.l3_plugin.add_router_interface(
self.context, router['id'], interface_info)
kwargs = {'context': self.context, 'router_id': router['id'],
'network_id': net['network']['id'],
'router_db': mock.ANY,
'port': mock.ANY,
'interface_info': interface_info}
self.context, router['id'], interface_info)
notif_handler_before.callback.assert_called_once_with(
resources.ROUTER_INTERFACE, events.BEFORE_CREATE,
mock.ANY, **kwargs)
kwargs_after = {'cidrs': mock.ANY,
'context': mock.ANY,
'gateway_ips': mock.ANY,
'interface_info': mock.ANY,
'network_id': None,
'port': mock.ANY,
'new_interface': True,
'subnets': mock.ANY,
'port_id': port['port']['id'],
'router_id': router['id']}
mock.ANY, payload=mock.ANY)
payload = notif_handler_before.mock_calls[0][2]['payload']
self.assertEqual(interface_info,
payload.metadata.get('interface_info'))
self.assertEqual(net['network']['id'],
payload.metadata.get('network_id'))
self.assertEqual(router['id'], payload.resource_id)
self.assertEqual(self.context, payload.context)
notif_handler_after.callback.assert_called_once_with(
resources.ROUTER_INTERFACE, events.AFTER_CREATE,
mock.ANY, **kwargs_after)
mock.ANY, payload=mock.ANY)
payload = notif_handler_after.mock_calls[0][2]['payload']
self.assertEqual(port['port']['id'],
payload.metadata.get('port').get('id'))
self.assertEqual(router['id'], payload.resource_id)
class L3DvrTestCaseMigration(L3DvrTestCaseBase):

View File

@ -238,19 +238,23 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
expected_scheduling=0, expected_casts=0)
def test__notify_agents_with_router_interface_add(self):
payload = events.DBEventPayload(
mock.Mock(), metadata={
'port': {'id': 'foo_port_id',
'network_id': 'foo_network_id'}})
self._test__notify_agents_with_function(
lambda: self.notifier._after_router_interface_created(
mock.ANY, mock.ANY, mock.ANY, context=mock.Mock(),
port={'id': 'foo_port_id', 'network_id': 'foo_network_id'}),
mock.ANY, mock.ANY, mock.ANY, payload=payload),
expected_scheduling=1, expected_casts=1)
def test__notify_agents_with_router_interface_delete(self):
payload = events.DBEventPayload(
mock.Mock(), metadata={
'port': {'id': 'foo_port_id',
'network_id': 'foo_network_id'}})
self._test__notify_agents_with_function(
lambda: self.notifier._after_router_interface_deleted(
mock.ANY, mock.ANY, mock.ANY, context=mock.Mock(),
port={'id': 'foo_port_id', 'network_id': 'foo_network_id',
'fixed_ips': {'subnet_id': 'subnet1',
'ip_address': '10.0.0.1'}}),
mock.ANY, mock.ANY, mock.ANY, payload=payload),
expected_scheduling=0, expected_casts=1)
def test__fanout_message(self):

View File

@ -275,7 +275,7 @@ class TestL3_NAT_dbonly_mixin(
mock.ANY, fip, floatingip_obj)
def test__notify_attaching_interface(self):
with mock.patch.object(l3_db.registry, 'notify') as mock_notify:
with mock.patch.object(l3_db.registry, 'publish') as mock_notify:
context = mock.MagicMock()
router_id = 'router_id'
net_id = 'net_id'
@ -284,12 +284,17 @@ class TestL3_NAT_dbonly_mixin(
port = {'network_id': net_id}
intf = {}
self.db._notify_attaching_interface(context, router_db, port, intf)
kwargs = {'context': context, 'router_id': router_id,
'network_id': net_id, 'interface_info': intf,
'router_db': router_db, 'port': port}
mock_notify.assert_called_once_with(
resources.ROUTER_INTERFACE, events.BEFORE_CREATE, self.db,
**kwargs)
payload=mock.ANY)
payload = mock_notify.mock_calls[0][2]['payload']
self.assertEqual(context, payload.context)
self.assertEqual(router_id, payload.resource_id)
self.assertEqual(net_id, payload.metadata.get('network_id'))
self.assertEqual(intf, payload.metadata.get('interface_info'))
self.assertEqual(router_db, payload.latest_state)
self.assertEqual(port, payload.metadata.get('port'))
def test__create_gw_port(self):
# NOTE(slaweq): this test is probably wrong

View File

@ -2113,7 +2113,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
def test_router_remove_interface_callback_failure_returns_409(self):
with self.router() as r,\
self.subnet() as s,\
mock.patch.object(registry, 'notify') as notify:
mock.patch.object(registry, 'publish') as notify:
errors = [
exceptions.NotificationError(
'foo_callback_id', n_exc.InUse()),
@ -2138,7 +2138,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
def test_router_clear_gateway_callback_failure_returns_409(self):
with self.router() as r,\
self.subnet() as s,\
mock.patch.object(registry, 'notify') as notify:
mock.patch.object(registry, 'publish') as notify:
errors = [
exceptions.NotificationError(
'foo_callback_id', n_exc.InUse()),