use payloads for PORT AFTER_DELETE events

This patch switches over to callback payloads for PORT
AFTER_DELETE events.
Some shims were removed.

Change-Id: If69e37b84fe1b027777b1d673b3d08a6651a979e
This commit is contained in:
Nurmatov Mamatisa 2021-06-18 10:59:19 +03:00 committed by Mamatisa Nurmatov
parent 93ff5afdbf
commit 3cae410b30
12 changed files with 85 additions and 185 deletions

View File

@ -111,22 +111,12 @@ class DhcpAgentNotifyAPI(object):
self.uses_native_notifications[resource] = {'create': False,
'update': False,
'delete': False}
callback = self._native_event_send_dhcp_notification
# TODO(boden): remove shim below once all events use payloads
if resource in [resources.NETWORK, resources.SUBNET]:
callback = self._native_event_send_dhcp_notification_payload
if resource == resources.PORT:
registry.subscribe(
self._native_event_send_dhcp_notification_payload,
resource, events.AFTER_CREATE)
registry.subscribe(
self._native_event_send_dhcp_notification_payload,
resource, events.AFTER_UPDATE)
else:
registry.subscribe(callback, resource, events.AFTER_CREATE)
registry.subscribe(callback, resource, events.AFTER_UPDATE)
registry.subscribe(callback, resource, events.AFTER_DELETE)
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_CREATE)
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_UPDATE)
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_DELETE)
@property
def plugin(self):
@ -291,11 +281,8 @@ class DhcpAgentNotifyAPI(object):
{'port_id': port['id']},
port['network_id'])
def _native_event_send_dhcp_notification_payload(
self, resource, event, trigger, payload=None):
# TODO(boden): collapse the native event methods back into one
def _native_event_send_dhcp_notification(self, resource, event, trigger,
payload):
action = event.replace('after_', '')
# we unsubscribe the _send_dhcp_notification method now that we know
# the loaded core plugin emits native resource events
@ -309,30 +296,12 @@ class DhcpAgentNotifyAPI(object):
resource)
method_name = '.'.join((resource, action, 'end'))
data = {resource: payload.latest_state}
self.notify(payload.context, data, method_name)
def _native_event_send_dhcp_notification(self, resource, event, trigger,
context, **kwargs):
action = event.replace('after_', '')
# we unsubscribe the _send_dhcp_notification method now that we know
# the loaded core plugin emits native resource events
if resource not in self._unsubscribed_resources:
self.uses_native_notifications[resource][action] = True
if all(self.uses_native_notifications[resource].values()):
# only unsubscribe the API level listener if we are
# receiving all event types for this resource
self._unsubscribed_resources.append(resource)
registry.unsubscribe_by_resource(self._send_dhcp_notification,
resource)
method_name = '.'.join((resource, action, 'end'))
payload = kwargs[resource]
data = {resource: payload}
if resource == resources.PORT:
if self._only_status_changed(kwargs.get('original_port'),
kwargs.get('port')):
if resource == resources.PORT and event == events.AFTER_UPDATE:
if self._only_status_changed(payload.states[0],
payload.latest_state):
# don't waste time updating the DHCP agent for status updates
return
self.notify(context, data, method_name)
self.notify(payload.context, data, method_name)
def _only_status_changed(self, orig, new):
# a status change will manifest as a bumped revision number, a new

View File

@ -1943,9 +1943,9 @@ class L3RpcNotifierMixin(object):
@staticmethod
@registry.receives(resources.PORT, [events.AFTER_DELETE])
def _notify_routers_callback(resource, event, trigger, **kwargs):
context = kwargs['context']
router_ids = kwargs['router_ids']
def _notify_routers_callback(resource, event, trigger, payload):
context = payload.context
router_ids = payload.metadata['router_ids']
l3plugin = directory.get_plugin(plugin_constants.L3)
if l3plugin:
l3plugin.notify_routers_updated(context, router_ids)

View File

@ -546,10 +546,11 @@ def _notify_l3_agent_new_port(resource, event, trigger, payload=None):
l3plugin.update_arp_entry_for_dvr_service_port(context, port)
def _notify_port_delete(event, resource, trigger, **kwargs):
context = kwargs['context']
port = kwargs['port']
get_related_hosts_info = kwargs.get("get_related_hosts_info", True)
def _notify_port_delete(event, resource, trigger, payload):
context = payload.context
port = payload.latest_state
get_related_hosts_info = payload.metadata.get(
"get_related_hosts_info", True)
l3plugin = directory.get_plugin(plugin_constants.L3)
if port:
port_host = port.get(portbindings.HOST_ID)
@ -607,14 +608,13 @@ def _notify_l3_agent_port_update(resource, event, trigger, payload):
original_port,
get_related_hosts_info=False)
if removed_routers:
removed_router_args = {
'context': context,
'port': original_port,
'removed_routers': removed_routers,
'get_related_hosts_info': False,
}
_notify_port_delete(
event, resource, trigger, **removed_router_args)
event, resource, trigger,
payload=events.DBEventPayload(
context,
metadata={'removed_routers': removed_routers,
'get_related_hosts_info': False},
states=(original_port,)))
def _should_notify_on_fip_update():
if not fip_router_id:

View File

@ -42,11 +42,11 @@ class SecurityGroupServerNotifierRpcMixin(sg_db.SecurityGroupDbMixin):
"""Mixin class to add agent-based security group implementation."""
@registry.receives(resources.PORT, [events.AFTER_CREATE,
events.AFTER_UPDATE])
def _notify_sg_on_port_after_create_and_update(
self, resource, event, trigger, payload):
# TODO(boden): refact back into single method when all callbacks are
# moved to payload style events
events.AFTER_UPDATE,
events.AFTER_DELETE])
def _notify_sg_on_port_change(self, resource, event, trigger, payload):
"""Trigger notification to other SG members on port changes."""
context = payload.context
port = payload.latest_state
if event == events.AFTER_UPDATE:
@ -56,13 +56,6 @@ class SecurityGroupServerNotifierRpcMixin(sg_db.SecurityGroupDbMixin):
else:
self.notify_security_groups_member_updated(context, port)
@registry.receives(resources.PORT, [events.AFTER_DELETE])
def notify_sg_on_port_change(self, resource, event, trigger, context,
port, *args, **kwargs):
"""Trigger notification to other SG members on port changes."""
self.notify_security_groups_member_updated(context, port)
def create_security_group_rule(self, context, security_group_rule):
rule = super(SecurityGroupServerNotifierRpcMixin,
self).create_security_group_rule(context,

View File

@ -123,9 +123,9 @@ class Notifier(object):
@registry.receives(resources.PORT, [events.AFTER_DELETE])
def process_port_delete_event(self, resource, event, trigger,
original_port=None, port=None,
**kwargs):
payload):
# We only want to notify about baremetal ports.
port = payload.latest_state
if not (port[portbindings_def.VNIC_TYPE] ==
portbindings_def.VNIC_BAREMETAL):
# TODO(TheJulia): Add the smartnic flag at some point...

View File

@ -25,7 +25,6 @@ from neutron_lib import context as n_ctx
from neutron_lib.db import api as db_api
from oslo_log import log as logging
from neutron._i18n import _
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.handlers import resources_rpc
from neutron.objects import address_group
@ -40,11 +39,6 @@ LOG = logging.getLogger(__name__)
class _ObjectChangeHandler(object):
MAX_IDLE_FOR = 1
_TO_CLEAN = weakref.WeakSet()
_PAYLOAD_RESOURCES = (resources.NETWORK,
resources.ADDRESS_GROUP,
resources.SECURITY_GROUP_RULE,
resources.SUBNET,
resources.SECURITY_GROUP)
def __init__(self, resource, object_class, resource_push_api):
self._resource = resource
@ -54,15 +48,8 @@ class _ObjectChangeHandler(object):
self._semantic_warned = False
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
events.AFTER_DELETE):
handler = self.handle_event
registry.subscribe(self.handle_event, resource, event)
# TODO(boden): remove shim below once all events use payloads
if resource in self._PAYLOAD_RESOURCES:
handler = self.handle_payload_event
if resource == resources.PORT and event in (events.AFTER_CREATE,
events.AFTER_UPDATE):
handler = self.handle_payload_event
registry.subscribe(handler, resource, event)
self._stop = threading.Event()
self._worker = threading.Thread(
target=self.dispatch_events,
@ -98,8 +85,12 @@ class _ObjectChangeHandler(object):
self._semantic_warned = True
return True
def handle_payload_event(self, resource, event,
trigger, payload=None):
def handle_event(self, resource, event, trigger, payload):
"""Callback handler for resource change that pushes change to RPC.
We always retrieve the latest state and ignore what was in the
payload to ensure that we don't get any stale data.
"""
if self._is_session_semantic_violated(
payload.context, resource, event):
return
@ -108,20 +99,6 @@ class _ObjectChangeHandler(object):
# to the server-side event that triggered it
self._resources_to_push.put((resource_id, payload.context.to_dict()))
def handle_event(self, resource, event, trigger,
context, *args, **kwargs):
"""Callback handler for resource change that pushes change to RPC.
We always retrieve the latest state and ignore what was in the
payload to ensure that we don't get any stale data.
"""
if self._is_session_semantic_violated(context, resource, event):
return
resource_id = self._extract_resource_id(kwargs)
# we preserve the context so we can trace a receive on the agent back
# to the server-side event that triggered it
self._resources_to_push.put((resource_id, context.to_dict()))
def dispatch_events(self):
# TODO(kevinbenton): now that we are batching these, convert to a
# single get_objects call for all of them
@ -157,14 +134,6 @@ class _ObjectChangeHandler(object):
{'name': self._worker.name,
'msgs': self._resources_to_push.unfinished_tasks})
def _extract_resource_id(self, callback_kwargs):
id_kwarg = '%s_id' % self._resource
if id_kwarg in callback_kwargs:
return callback_kwargs[id_kwarg]
if self._resource in callback_kwargs:
return callback_kwargs[self._resource]['id']
raise RuntimeError(_("Couldn't find resource ID in callback event"))
@classmethod
def clean_up(cls, *args, **kwargs):
"""Ensure all threads that were created were destroyed cleanly."""

View File

@ -1998,12 +1998,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _post_delete_port(self, context, port, router_ids,
bound_mech_contexts):
kwargs = {
'context': context,
'port': port,
'router_ids': router_ids,
}
registry.notify(resources.PORT, events.AFTER_DELETE, self, **kwargs)
registry.publish(resources.PORT, events.AFTER_DELETE, self,
payload=events.DBEventPayload(
context,
metadata={'router_ids': router_ids},
resource_id=port['id'],
states=(port,)))
try:
# Note that DVR Interface ports will have bindings on
# multiple hosts, and so will have multiple mech_contexts,

View File

@ -420,17 +420,12 @@ class NovaSegmentNotifier(object):
'routed network segment %(segment_id)s',
{'host': event.host, 'segment_id': segment_id})
@registry.receives(resources.PORT, [events.AFTER_CREATE])
def _notify_port_created(self, resource, event, trigger,
payload=None):
# TODO(boden): refactor back into 1 method when all code is moved
# to event payloads
return self._notify_port_created_or_deleted(
resource, event, trigger, payload.context, payload.latest_state)
@registry.receives(resources.PORT, [events.AFTER_DELETE])
@registry.receives(resources.PORT, [events.AFTER_CREATE,
events.AFTER_DELETE])
def _notify_port_created_or_deleted(self, resource, event, trigger,
context, port, **kwargs):
payload):
context = payload.context
port = payload.latest_state
if not self._does_port_require_nova_inventory_update(port):
return
ipv4_subnets_number, segment_id = (

View File

@ -267,42 +267,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
def test__native_notification_unsubscribes(self):
self.assertFalse(self.notifier._unsubscribed_resources)
for res in (resources.PORT,):
self.notifier._unsubscribed_resources = []
kwargs = {res: {}}
if res == resources.PORT:
registry.publish(res, events.AFTER_CREATE, self,
payload=events.DBEventPayload(
mock.Mock(), states=({res: {}},)))
else:
registry.notify(res, events.AFTER_CREATE, self,
context=mock.Mock(), **kwargs)
# don't unsubscribe until all three types are observed
self.assertEqual([], self.notifier._unsubscribed_resources)
if res == resources.PORT:
registry.publish(res, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(
mock.Mock(), states=({},)))
else:
registry.notify(res, events.AFTER_UPDATE, self,
context=mock.Mock(), **kwargs)
self.assertEqual([], self.notifier._unsubscribed_resources)
registry.notify(res, events.AFTER_DELETE, self,
context=mock.Mock(), **kwargs)
self.assertEqual([res], self.notifier._unsubscribed_resources)
# after first time, no further unsubscribing should happen
if res == resources.PORT:
registry.publish(res, events.AFTER_CREATE, self,
payload=events.DBEventPayload(
mock.Mock(), states=({res: {}})))
else:
registry.notify(res, events.AFTER_CREATE, self,
context=mock.Mock(), **kwargs)
self.assertEqual([res], self.notifier._unsubscribed_resources)
for res in (resources.NETWORK, resources.SUBNET):
for res in (resources.PORT, resources.NETWORK, resources.SUBNET):
self.notifier._unsubscribed_resources = []
registry.publish(res, events.AFTER_CREATE, self,
payload=events.DBEventPayload(mock.Mock()))

View File

@ -133,9 +133,11 @@ class TestIronicNotifier(base.BaseTestCase):
autospec=True)
def test_process_port_delete_event(self, mock_queue_event):
port = get_fake_port()
original_port = None
self.ironic_notifier.process_port_delete_event(
'fake_resource', 'fake_event', 'fake_trigger', original_port=None,
port=port, **{})
'fake_resource', 'fake_event', 'fake_trigger',
payload=events.DBEventPayload(
mock.Mock(), states=(original_port, port)))
mock_queue_event.assert_called_with(
self.ironic_notifier.batch_notifier,
{'event': 'network.delete_port',
@ -150,10 +152,12 @@ class TestIronicNotifier(base.BaseTestCase):
autospec=True)
def test_process_port_event_empty_uuid_field(self, mock_queue_event):
port = get_fake_port()
original_port = None
port.update({'device_id': ''})
self.ironic_notifier.process_port_delete_event(
'fake_resource', 'fake_event', 'fake_trigger', original_port=None,
port=port, **{})
'fake_resource', 'fake_event', 'fake_trigger',
payload=events.DBEventPayload(
mock.Mock(), states=(original_port, port)))
mock_queue_event.assert_called_with(
self.ironic_notifier.batch_notifier,
{'event': 'network.delete_port',
@ -166,9 +170,11 @@ class TestIronicNotifier(base.BaseTestCase):
@mock.patch.object(eventlet, 'spawn_n', autospec=True)
def test_queue_events(self, mock_spawn_n):
port = get_fake_port()
original_port = None
self.ironic_notifier.process_port_delete_event(
'fake_resource', 'fake_event', 'fake_trigger', original_port=None,
port=port, **{})
'fake_resource', 'fake_event', 'fake_trigger',
payload=events.DBEventPayload(
mock.Mock(), states=(original_port, port)))
port = get_fake_port()
port.update({'status': n_const.PORT_STATUS_ACTIVE})

View File

@ -1099,8 +1099,9 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
def test_port_after_delete_outside_transaction(self):
self.tx_open = True
receive = lambda *a, **k: setattr(self, 'tx_open',
k['context'].session.is_active)
receive = lambda r, e, t, payload: \
setattr(self, 'tx_open', payload.context.session.is_active)
with self.port() as p:
registry.subscribe(receive, resources.PORT, events.AFTER_DELETE)
self._delete('ports', p['port']['id'])
@ -2093,29 +2094,34 @@ class TestMl2DvrPortsV2(TestMl2PortsV2):
return_value=router_ids):
port_id = port['port']['id']
self.plugin.delete_port(self.context, port_id)
self.assertEqual(2, notify.call_count)
self.assertEqual(1, publish.call_count)
self.assertEqual(1, notify.call_count)
self.assertEqual(2, publish.call_count)
# needed for a full match in the assertion below
port['port']['extra_dhcp_opts'] = []
port['port']['standard_attr_id'] = mock.ANY
expected = [mock.call(resources.PORT, events.PRECOMMIT_DELETE,
mock.ANY, network=mock.ANY, bind=mock.ANY,
port=port['port'], port_db=mock.ANY,
context=self.context, levels=mock.ANY,
id=mock.ANY, bindings=mock.ANY),
mock.call(resources.PORT, events.AFTER_DELETE,
mock.ANY, context=self.context,
port=port['port'],
router_ids=router_ids)]
id=mock.ANY, bindings=mock.ANY)]
notify.assert_has_calls(expected)
expected = [mock.call(resources.PORT, events.BEFORE_DELETE,
mock.ANY, payload=mock.ANY)]
publish.assert_has_calls(expected)
payload = publish.call_args[1]['payload']
payload = publish.call_args_list[0][1]['payload']
self.assertEqual(port_id, payload.resource_id)
self.assertTrue(payload.metadata['port_check'])
expected = [mock.call(resources.PORT, events.AFTER_DELETE,
mock.ANY, payload=mock.ANY)]
publish.assert_has_calls(expected)
payload = publish.call_args_list[1][1]['payload']
self.assertEqual(port_id, payload.resource_id)
def test_delete_port_with_floatingip_notifies_l3_plugin(self):
self.test_delete_port_notifies_l3_plugin(floating_ip=True)

View File

@ -1250,19 +1250,16 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin,
portbindings.HOST_ID: 'host1',
}
kwargs = {
'context': self.adminContext,
'port': port,
'removed_routers': [
{'agent_id': 'foo_agent', 'router_id': 'foo_id'},
],
}
removed_routers = [{'agent_id': 'foo_agent',
'router_id': 'foo_id',
'host': 'foo_host'}]
l3plugin.get_dvr_routers_to_remove.return_value = removed_routers
l3_dvrscheduler_db._notify_port_delete(
'port', 'after_delete', plugin, **kwargs)
'port', 'after_delete', plugin,
payload=events.DBEventPayload(
self.adminContext,
metadata={'removed_routers': removed_routers},
states=(port,)))
l3plugin.delete_arp_entry_for_dvr_service_port.\
assert_called_once_with(
self.adminContext, mock.ANY)