use payloads for PORT AFTER_CREATE events

This patch switches the code over to the payload style of callbacks [1]
for PORT AFTER_CREATE events. In addition it adds a branch/shim to the
dhcp_rpc_agent_api to support both payload and kwarg style callbacks.

NeutronLibImpact

[1]
https://docs.openstack.org/neutron-lib/latest/contributor/callbacks.html

Change-Id: I25d43d4f8f2390b07e0d11c631f894d88669bbe0
This commit is contained in:
Nurmatov Mamatisa 2021-05-21 11:57:26 +03:00 committed by Mamatisa Nurmatov
parent 7e98d18927
commit c9fce3a8b6
9 changed files with 72 additions and 39 deletions

View File

@ -117,8 +117,12 @@ class DhcpAgentNotifyAPI(object):
# TODO(boden): remove shim below once all events use payloads
if resource == resources.NETWORK:
callback = self._native_event_send_dhcp_notification_payload
registry.subscribe(callback, resource, events.AFTER_CREATE)
if resource == resources.PORT:
registry.subscribe(
self._native_event_send_dhcp_notification_payload,
resource, events.AFTER_CREATE)
else:
registry.subscribe(callback, resource, events.AFTER_CREATE)
registry.subscribe(callback, resource, events.AFTER_UPDATE)
registry.subscribe(callback, resource, events.AFTER_DELETE)

View File

@ -533,17 +533,17 @@ def _dvr_handle_unbound_allowed_addr_pair_del(
context, port, fixed_ips_to_delete=aa_fixed_ips)
def _notify_l3_agent_new_port(resource, event, trigger, **kwargs):
def _notify_l3_agent_new_port(resource, event, trigger, payload=None):
LOG.debug('Received %(resource)s %(event)s', {
'resource': resource,
'event': event})
port = kwargs.get('port')
port = payload.latest_state
if not port:
return
if n_utils.is_dvr_serviced(port['device_owner']):
l3plugin = directory.get_plugin(plugin_constants.L3)
context = kwargs['context']
context = payload.context
l3plugin.dvr_handle_new_service_port(context, port)
l3plugin.update_arp_entry_for_dvr_service_port(context, port)

View File

@ -41,8 +41,15 @@ DHCP_RULE_PORT = {4: (67, 68, const.IPv4), 6: (547, 546, const.IPv6)}
class SecurityGroupServerNotifierRpcMixin(sg_db.SecurityGroupDbMixin):
"""Mixin class to add agent-based security group implementation."""
@registry.receives(resources.PORT, [events.AFTER_CREATE,
events.AFTER_UPDATE,
@registry.receives(resources.PORT, [events.AFTER_CREATE])
def _notify_sg_on_port_after_update(
self, resource, event, trigger, payload=None):
# TODO(boden): refact back into single method when all callbacks are
# moved to payload style events
self.notify_security_groups_member_updated(
payload.context, payload.latest_state)
@registry.receives(resources.PORT, [events.AFTER_UPDATE,
events.AFTER_DELETE])
def notify_sg_on_port_change(self, resource, event, trigger, context,
port, *args, **kwargs):

View File

@ -455,12 +455,13 @@ def _filter_by_subnet(context, fixed_ips):
return [str(ip['ip_address']) for ip in fixed_ips]
def _create_port_in_external_dns_service(resource, event, trigger, **kwargs):
def _create_port_in_external_dns_service(resource, event,
trigger, payload=None):
dns_driver = _get_dns_driver()
if not dns_driver:
return
context = kwargs['context']
port = kwargs['port']
context = payload.context
port = payload.latest_state
dns_data_db = port_obj.PortDNS.get_object(
context, port_id=port['id'])
if not (dns_data_db and dns_data_db['current_dns_name']):

View File

@ -1461,8 +1461,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# add network to port dict to save a DB call on dhcp notification
result['network'] = mech_context.network.current
# notify any plugin that is interested in port create events
kwargs = {'context': context, 'port': result}
registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
registry.publish(resources.PORT, events.AFTER_CREATE, self,
payload=events.DBEventPayload(
context, states=(result,),
resource_id=result['id']))
try:
self.mechanism_manager.create_port_postcommit(mech_context)

View File

@ -416,8 +416,15 @@ class NovaSegmentNotifier(object):
'routed network segment %(segment_id)s',
{'host': event.host, 'segment_id': segment_id})
@registry.receives(resources.PORT,
[events.AFTER_CREATE, events.AFTER_DELETE])
@registry.receives(resources.PORT, [events.AFTER_CREATE])
def _notify_port_created(self, resource, event, trigger,
payload=None):
# TODO(boden): refactor back into 1 method when all code is moved
# to event payloads
return self._notify_port_created_or_deleted(
resource, event, trigger, payload.context, payload.latest_state)
@registry.receives(resources.PORT, [events.AFTER_DELETE])
def _notify_port_created_or_deleted(self, resource, event, trigger,
context, port, **kwargs):
if not self._does_port_require_nova_inventory_update(port):

View File

@ -266,8 +266,13 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
for res in (resources.PORT, resources.SUBNET):
self.notifier._unsubscribed_resources = []
kwargs = {res: {}}
registry.notify(res, events.AFTER_CREATE, self,
context=mock.Mock(), **kwargs)
if res == resources.PORT:
registry.publish(res, events.AFTER_CREATE, self,
payload=events.DBEventPayload(
mock.Mock(), states=({res: {}},)))
else:
registry.notify(res, events.AFTER_CREATE, self,
context=mock.Mock(), **kwargs)
# don't unsubscribe until all three types are observed
self.assertEqual([], self.notifier._unsubscribed_resources)
registry.notify(res, events.AFTER_UPDATE, self,
@ -277,8 +282,13 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
context=mock.Mock(), **kwargs)
self.assertEqual([res], self.notifier._unsubscribed_resources)
# after first time, no further unsubscribing should happen
registry.notify(res, events.AFTER_CREATE, self,
context=mock.Mock(), **kwargs)
if res == resources.PORT:
registry.publish(res, events.AFTER_CREATE, self,
payload=events.DBEventPayload(
mock.Mock(), states=({res: {}})))
else:
registry.notify(res, events.AFTER_CREATE, self,
context=mock.Mock(), **kwargs)
self.assertEqual([res], self.notifier._unsubscribed_resources)
for res in [resources.NETWORK]:

View File

@ -1080,8 +1080,10 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
def test_port_after_create_outside_transaction(self):
self.tx_open = True
receive = lambda *a, **k: setattr(self, 'tx_open',
k['context'].session.is_active)
def receive(r, e, t, payload=None):
setattr(self, 'tx_open', payload.context.session.is_active)
registry.subscribe(receive, resources.PORT, events.AFTER_CREATE)
with self.port():
self.assertFalse(self.tx_open)
@ -1752,7 +1754,10 @@ class TestMl2PortsV2WithRevisionPlugin(Ml2PluginV2TestCase):
updated_ports = []
created_ports = []
ureceiver = lambda *a, **k: updated_ports.append(k['port'])
creceiver = lambda *a, **k: created_ports.append(k['port'])
def creceiver(r, e, t, payload=None):
created_ports.append(payload.latest_state)
registry.subscribe(ureceiver, resources.PORT,
events.AFTER_UPDATE)
registry.subscribe(creceiver, resources.PORT,

View File

@ -21,9 +21,12 @@ from unittest import mock
from neutron_lib.api.definitions import l3_ext_ha_mode
from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import router_availability_zone
from neutron_lib.callbacks import events
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib import context as n_context
from neutron_lib.exceptions import l3 as l3_exc
from neutron_lib import fixture
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
from neutron_lib import rpc as n_rpc
@ -810,6 +813,7 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin,
service_plugins = None
super(L3DvrSchedulerTestCase, self).setUp('ml2',
service_plugins=service_plugins)
self.useFixture(fixture.CallbackRegistryFixture())
self.setup_coreplugin('ml2')
self.adminContext = n_context.get_admin_context()
self.dut = L3DvrScheduler()
@ -947,35 +951,28 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin,
self.assertFalse(l3plugin.get_dvr_routers_to_remove.called)
def test__notify_l3_agent_new_port_action(self):
kwargs = {
'context': self.adminContext,
'original_port': None,
'port': {
'device_owner': DEVICE_OWNER_COMPUTE,
},
}
port_dict = {'device_owner': DEVICE_OWNER_COMPUTE}
l3plugin = mock.Mock()
directory.add_plugin(plugin_constants.L3, l3plugin)
l3_dvrscheduler_db._notify_l3_agent_new_port(
'port', 'after_create', mock.ANY, **kwargs)
resources.PORT, events.AFTER_CREATE, mock.ANY,
payload=events.DBEventPayload(
self.adminContext, states=(port_dict,)))
l3plugin.update_arp_entry_for_dvr_service_port.\
assert_called_once_with(
self.adminContext, kwargs.get('port'))
self.adminContext, port_dict)
l3plugin.dvr_handle_new_service_port.assert_called_once_with(
self.adminContext, kwargs.get('port'))
self.adminContext, port_dict)
def test__notify_l3_agent_new_port_no_action(self):
kwargs = {
'context': self.adminContext,
'original_port': None,
'port': {
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX + 'None',
}
}
port_dict = {
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX + 'None'}
l3plugin = mock.Mock()
directory.add_plugin(plugin_constants.L3, l3plugin)
l3_dvrscheduler_db._notify_l3_agent_new_port(
'port', 'after_create', mock.ANY, **kwargs)
resources.PORT, events.AFTER_CREATE, mock.ANY,
payload=events.DBEventPayload(
self.adminContext, states=(port_dict,)))
self.assertFalse(
l3plugin.update_arp_entry_for_dvr_service_port.called)
self.assertFalse(