diff --git a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py index 6875e277854..33dc11aa082 100644 --- a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py +++ b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py @@ -117,8 +117,12 @@ class DhcpAgentNotifyAPI(object): # TODO(boden): remove shim below once all events use payloads if resource == resources.NETWORK: callback = self._native_event_send_dhcp_notification_payload - - registry.subscribe(callback, resource, events.AFTER_CREATE) + if resource == resources.PORT: + registry.subscribe( + self._native_event_send_dhcp_notification_payload, + resource, events.AFTER_CREATE) + else: + registry.subscribe(callback, resource, events.AFTER_CREATE) registry.subscribe(callback, resource, events.AFTER_UPDATE) registry.subscribe(callback, resource, events.AFTER_DELETE) diff --git a/neutron/db/l3_dvrscheduler_db.py b/neutron/db/l3_dvrscheduler_db.py index 7edf83ee689..eaf0e4664fd 100644 --- a/neutron/db/l3_dvrscheduler_db.py +++ b/neutron/db/l3_dvrscheduler_db.py @@ -533,17 +533,17 @@ def _dvr_handle_unbound_allowed_addr_pair_del( context, port, fixed_ips_to_delete=aa_fixed_ips) -def _notify_l3_agent_new_port(resource, event, trigger, **kwargs): +def _notify_l3_agent_new_port(resource, event, trigger, payload=None): LOG.debug('Received %(resource)s %(event)s', { 'resource': resource, 'event': event}) - port = kwargs.get('port') + port = payload.latest_state if not port: return if n_utils.is_dvr_serviced(port['device_owner']): l3plugin = directory.get_plugin(plugin_constants.L3) - context = kwargs['context'] + context = payload.context l3plugin.dvr_handle_new_service_port(context, port) l3plugin.update_arp_entry_for_dvr_service_port(context, port) diff --git a/neutron/db/securitygroups_rpc_base.py b/neutron/db/securitygroups_rpc_base.py index b7985bac16d..398944ff8dd 100644 --- a/neutron/db/securitygroups_rpc_base.py +++ b/neutron/db/securitygroups_rpc_base.py @@ -41,8 +41,15 @@ DHCP_RULE_PORT = {4: (67, 68, const.IPv4), 6: (547, 546, const.IPv6)} class SecurityGroupServerNotifierRpcMixin(sg_db.SecurityGroupDbMixin): """Mixin class to add agent-based security group implementation.""" - @registry.receives(resources.PORT, [events.AFTER_CREATE, - events.AFTER_UPDATE, + @registry.receives(resources.PORT, [events.AFTER_CREATE]) + def _notify_sg_on_port_after_update( + self, resource, event, trigger, payload=None): + # TODO(boden): refact back into single method when all callbacks are + # moved to payload style events + self.notify_security_groups_member_updated( + payload.context, payload.latest_state) + + @registry.receives(resources.PORT, [events.AFTER_UPDATE, events.AFTER_DELETE]) def notify_sg_on_port_change(self, resource, event, trigger, context, port, *args, **kwargs): diff --git a/neutron/plugins/ml2/extensions/dns_integration.py b/neutron/plugins/ml2/extensions/dns_integration.py index 2bf79f79a52..b6d73d5d7cc 100644 --- a/neutron/plugins/ml2/extensions/dns_integration.py +++ b/neutron/plugins/ml2/extensions/dns_integration.py @@ -455,12 +455,13 @@ def _filter_by_subnet(context, fixed_ips): return [str(ip['ip_address']) for ip in fixed_ips] -def _create_port_in_external_dns_service(resource, event, trigger, **kwargs): +def _create_port_in_external_dns_service(resource, event, + trigger, payload=None): dns_driver = _get_dns_driver() if not dns_driver: return - context = kwargs['context'] - port = kwargs['port'] + context = payload.context + port = payload.latest_state dns_data_db = port_obj.PortDNS.get_object( context, port_id=port['id']) if not (dns_data_db and dns_data_db['current_dns_name']): diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 36c5f9a47d7..33032759bfb 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -1462,8 +1462,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, # add network to port dict to save a DB call on dhcp notification result['network'] = mech_context.network.current # notify any plugin that is interested in port create events - kwargs = {'context': context, 'port': result} - registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs) + registry.publish(resources.PORT, events.AFTER_CREATE, self, + payload=events.DBEventPayload( + context, states=(result,), + resource_id=result['id'])) try: self.mechanism_manager.create_port_postcommit(mech_context) diff --git a/neutron/services/segments/plugin.py b/neutron/services/segments/plugin.py index be00c92c7a3..a228a02f0d3 100644 --- a/neutron/services/segments/plugin.py +++ b/neutron/services/segments/plugin.py @@ -416,8 +416,15 @@ class NovaSegmentNotifier(object): 'routed network segment %(segment_id)s', {'host': event.host, 'segment_id': segment_id}) - @registry.receives(resources.PORT, - [events.AFTER_CREATE, events.AFTER_DELETE]) + @registry.receives(resources.PORT, [events.AFTER_CREATE]) + def _notify_port_created(self, resource, event, trigger, + payload=None): + # TODO(boden): refactor back into 1 method when all code is moved + # to event payloads + return self._notify_port_created_or_deleted( + resource, event, trigger, payload.context, payload.latest_state) + + @registry.receives(resources.PORT, [events.AFTER_DELETE]) def _notify_port_created_or_deleted(self, resource, event, trigger, context, port, **kwargs): if not self._does_port_require_nova_inventory_update(port): diff --git a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py index 6a81753e2e8..327ad7ad961 100644 --- a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py +++ b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py @@ -266,8 +266,13 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase): for res in (resources.PORT, resources.SUBNET): self.notifier._unsubscribed_resources = [] kwargs = {res: {}} - registry.notify(res, events.AFTER_CREATE, self, - context=mock.Mock(), **kwargs) + if res == resources.PORT: + registry.publish(res, events.AFTER_CREATE, self, + payload=events.DBEventPayload( + mock.Mock(), states=({res: {}},))) + else: + registry.notify(res, events.AFTER_CREATE, self, + context=mock.Mock(), **kwargs) # don't unsubscribe until all three types are observed self.assertEqual([], self.notifier._unsubscribed_resources) registry.notify(res, events.AFTER_UPDATE, self, @@ -277,8 +282,13 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase): context=mock.Mock(), **kwargs) self.assertEqual([res], self.notifier._unsubscribed_resources) # after first time, no further unsubscribing should happen - registry.notify(res, events.AFTER_CREATE, self, - context=mock.Mock(), **kwargs) + if res == resources.PORT: + registry.publish(res, events.AFTER_CREATE, self, + payload=events.DBEventPayload( + mock.Mock(), states=({res: {}}))) + else: + registry.notify(res, events.AFTER_CREATE, self, + context=mock.Mock(), **kwargs) self.assertEqual([res], self.notifier._unsubscribed_resources) for res in [resources.NETWORK]: diff --git a/neutron/tests/unit/plugins/ml2/test_plugin.py b/neutron/tests/unit/plugins/ml2/test_plugin.py index cccaab2f027..cf4db6176cd 100644 --- a/neutron/tests/unit/plugins/ml2/test_plugin.py +++ b/neutron/tests/unit/plugins/ml2/test_plugin.py @@ -1080,8 +1080,10 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase): def test_port_after_create_outside_transaction(self): self.tx_open = True - receive = lambda *a, **k: setattr(self, 'tx_open', - k['context'].session.is_active) + + def receive(r, e, t, payload=None): + setattr(self, 'tx_open', payload.context.session.is_active) + registry.subscribe(receive, resources.PORT, events.AFTER_CREATE) with self.port(): self.assertFalse(self.tx_open) @@ -1752,7 +1754,10 @@ class TestMl2PortsV2WithRevisionPlugin(Ml2PluginV2TestCase): updated_ports = [] created_ports = [] ureceiver = lambda *a, **k: updated_ports.append(k['port']) - creceiver = lambda *a, **k: created_ports.append(k['port']) + + def creceiver(r, e, t, payload=None): + created_ports.append(payload.latest_state) + registry.subscribe(ureceiver, resources.PORT, events.AFTER_UPDATE) registry.subscribe(creceiver, resources.PORT, diff --git a/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py b/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py index 9664421b70e..ec58b13ab6b 100644 --- a/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py +++ b/neutron/tests/unit/scheduler/test_l3_agent_scheduler.py @@ -21,9 +21,12 @@ from unittest import mock from neutron_lib.api.definitions import l3_ext_ha_mode from neutron_lib.api.definitions import portbindings from neutron_lib.api.definitions import router_availability_zone +from neutron_lib.callbacks import events +from neutron_lib.callbacks import resources from neutron_lib import constants from neutron_lib import context as n_context from neutron_lib.exceptions import l3 as l3_exc +from neutron_lib import fixture from neutron_lib.plugins import constants as plugin_constants from neutron_lib.plugins import directory from neutron_lib import rpc as n_rpc @@ -810,6 +813,7 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin, service_plugins = None super(L3DvrSchedulerTestCase, self).setUp('ml2', service_plugins=service_plugins) + self.useFixture(fixture.CallbackRegistryFixture()) self.setup_coreplugin('ml2') self.adminContext = n_context.get_admin_context() self.dut = L3DvrScheduler() @@ -947,35 +951,28 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin, self.assertFalse(l3plugin.get_dvr_routers_to_remove.called) def test__notify_l3_agent_new_port_action(self): - kwargs = { - 'context': self.adminContext, - 'original_port': None, - 'port': { - 'device_owner': DEVICE_OWNER_COMPUTE, - }, - } + port_dict = {'device_owner': DEVICE_OWNER_COMPUTE} l3plugin = mock.Mock() directory.add_plugin(plugin_constants.L3, l3plugin) l3_dvrscheduler_db._notify_l3_agent_new_port( - 'port', 'after_create', mock.ANY, **kwargs) + resources.PORT, events.AFTER_CREATE, mock.ANY, + payload=events.DBEventPayload( + self.adminContext, states=(port_dict,))) l3plugin.update_arp_entry_for_dvr_service_port.\ assert_called_once_with( - self.adminContext, kwargs.get('port')) + self.adminContext, port_dict) l3plugin.dvr_handle_new_service_port.assert_called_once_with( - self.adminContext, kwargs.get('port')) + self.adminContext, port_dict) def test__notify_l3_agent_new_port_no_action(self): - kwargs = { - 'context': self.adminContext, - 'original_port': None, - 'port': { - 'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX + 'None', - } - } + port_dict = { + 'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX + 'None'} l3plugin = mock.Mock() directory.add_plugin(plugin_constants.L3, l3plugin) l3_dvrscheduler_db._notify_l3_agent_new_port( - 'port', 'after_create', mock.ANY, **kwargs) + resources.PORT, events.AFTER_CREATE, mock.ANY, + payload=events.DBEventPayload( + self.adminContext, states=(port_dict,))) self.assertFalse( l3plugin.update_arp_entry_for_dvr_service_port.called) self.assertFalse(