diff --git a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py index 33dc11aa082..3e4b3cf96c0 100644 --- a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py +++ b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py @@ -278,17 +278,18 @@ class DhcpAgentNotifyAPI(object): {'admin_state_up': admin_state_up}, host) def _after_router_interface_created(self, resource, event, trigger, - **kwargs): - self._notify_agents(kwargs['context'], 'port_create_end', - {'port': kwargs['port']}, - kwargs['port']['network_id']) + payload=None): + port = payload.metadata.get('port') + self._notify_agents(payload.context, 'port_create_end', + {'port': port}, + port['network_id']) def _after_router_interface_deleted(self, resource, event, trigger, - **kwargs): - self._notify_agents(kwargs['context'], 'port_delete_end', - {'port_id': kwargs['port']['id'], - 'fixed_ips': kwargs['port']['fixed_ips']}, - kwargs['port']['network_id']) + payload=None): + port = payload.metadata.get('port') + self._notify_agents(payload.context, 'port_delete_end', + {'port_id': port['id']}, + port['network_id']) def _native_event_send_dhcp_notification_payload( self, resource, event, trigger, payload=None): diff --git a/neutron/db/l3_db.py b/neutron/db/l3_db.py index e89f9fa885e..ee3a5dc8aa2 100644 --- a/neutron/db/l3_db.py +++ b/neutron/db/l3_db.py @@ -747,15 +747,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, prevent the port to be attach to the router. """ try: - registry.notify(resources.ROUTER_INTERFACE, - events.BEFORE_CREATE, - self, - context=context, - router_db=router_db, - port=port, - interface_info=interface_info, - router_id=router_db.id, - network_id=port['network_id']) + metadata = { + 'port': port, 'interface_info': interface_info, + 'network_id': port['network_id']} + registry.publish(resources.ROUTER_INTERFACE, + events.BEFORE_CREATE, self, + payload=events.DBEventPayload( + context, states=(router_db,), + metadata=metadata, + resource_id=router_db.id)) except exceptions.CallbackFailure as e: # raise the underlying exception reason = (_('cannot perform router interface attachment ' @@ -888,19 +888,20 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, gw_network_id = router.gw_port.network_id gw_ips = [x['ip_address'] for x in router.gw_port.fixed_ips] - registry.notify(resources.ROUTER_INTERFACE, - events.AFTER_CREATE, - self, - context=context, - network_id=gw_network_id, - gateway_ips=gw_ips, - cidrs=[x['cidr'] for x in subnets], - subnets=subnets, - port_id=port['id'], - router_id=router_id, - port=port, - new_interface=new_router_intf, - interface_info=interface_info) + cidrs = [x['cidr'] for x in subnets] + metadata = {'interface_info': interface_info, + 'new_interface': new_router_intf, + 'port': port, + 'subnets': subnets, + 'cidrs': cidrs, + 'gateway_ips': gw_ips, + 'network_id': gw_network_id} + registry.publish(resources.ROUTER_INTERFACE, + events.AFTER_CREATE, self, + payload=events.DBEventPayload( + context, metadata=metadata, + states=(router,), + resource_id=router_id)) return self._make_router_interface_info( router.id, port['tenant_id'], port['id'], port['network_id'], @@ -1040,16 +1041,16 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, gw_network_id = router.gw_port.network_id gw_ips = [x['ip_address'] for x in router.gw_port.fixed_ips] - registry.notify(resources.ROUTER_INTERFACE, - events.AFTER_DELETE, - self, - context=context, - cidrs=[x['cidr'] for x in subnets], - network_id=gw_network_id, - gateway_ips=gw_ips, - port=port, - router_id=router_id, - interface_info=interface_info) + cidrs = [x['cidr'] for x in subnets] + metadata = {'interface_info': interface_info, + 'port': port, 'gateway_ips': gw_ips, + 'network_id': gw_network_id, 'cidrs': cidrs} + registry.publish(resources.ROUTER_INTERFACE, + events.AFTER_DELETE, self, + payload=events.DBEventPayload( + context, metadata=metadata, + resource_id=router_id)) + return self._make_router_interface_info(router_id, port['tenant_id'], port['id'], port['network_id'], subnets[0]['id'], diff --git a/neutron/db/l3_dvr_db.py b/neutron/db/l3_dvr_db.py index 973f98c2147..75940d40d11 100644 --- a/neutron/db/l3_dvr_db.py +++ b/neutron/db/l3_dvr_db.py @@ -497,30 +497,34 @@ class DVRResourceOperationHandler(object): self.update_arp_entry_for_dvr_service_port(context, service_port_dict) - @registry.receives(resources.ROUTER_INTERFACE, [events.BEFORE_CREATE]) @db_api.retry_if_session_inactive() - def _add_csnat_on_interface_create(self, resource, event, trigger, - context, router_db, port, **kwargs): - """Event handler to for csnat port creation on interface creation.""" - if not router_db.extra_attributes.distributed or not router_db.gw_port: + def _retry_add_csnat_on_interface_create(self, context, payload): + router_db = payload.latest_state + if not router_db.extra_attributes.distributed or not \ + router_db.gw_port: return - admin_context = n_utils.get_elevated_context(context) + admin_context = payload.context.elevated() + port = payload.metadata.get('port') self._add_csnat_router_interface_port( - admin_context, router_db, port['network_id'], + admin_context, payload.latest_state, + port['network_id'], [{'subnet_id': port['fixed_ips'][-1]['subnet_id']}]) - @registry.receives(resources.ROUTER_INTERFACE, [events.AFTER_CREATE]) + @registry.receives(resources.ROUTER_INTERFACE, [events.BEFORE_CREATE]) + def _add_csnat_on_interface_create(self, resource, event, trigger, + payload=None): + """Event handler to for csnat port creation on interface creation.""" + return self._retry_add_csnat_on_interface_create( + payload.context, payload) + @db_api.retry_if_session_inactive() - def _update_snat_v6_addrs_after_intf_update(self, resource, event, trigger, - context, subnets, port, - router_id, new_interface, - **kwargs): - if new_interface: + def _retry_update_snat_v6_addrs_after_intf_update(self, context, payload): + if payload.metadata.get('new_interface'): # _add_csnat_on_interface_create handler deals with new ports return # if not a new interface, the interface was added to a new subnet, # which is the first in this list - subnet = subnets[0] + subnet = payload.metadata.get('subnets')[0] if not subnet or subnet['ip_version'] != 6: return # NOTE: For IPv6 additional subnets added to the same @@ -528,8 +532,8 @@ class DVRResourceOperationHandler(object): # IPv6 subnet # Add new prefix to an existing ipv6 csnat port with the # same network id if one exists - admin_ctx = n_utils.get_elevated_context(context) - router = self.l3plugin._get_router(admin_ctx, router_id) + admin_ctx = payload.context.elevated() + router = self.l3plugin._get_router(admin_ctx, payload.resource_id) cs_port = self._find_v6_router_port_by_network_and_device_owner( router, subnet['network_id'], const.DEVICE_OWNER_ROUTER_SNAT) if not cs_port: @@ -537,9 +541,11 @@ class DVRResourceOperationHandler(object): new_fixed_ip = {'subnet_id': subnet['id']} fixed_ips = list(cs_port['fixed_ips']) fixed_ips.append(new_fixed_ip) + port = payload.metadata.get('port') try: updated_port = self._core_plugin.update_port( - admin_ctx, cs_port['id'], {'port': {'fixed_ips': fixed_ips}}) + admin_ctx, cs_port['id'], + {'port': {'fixed_ips': fixed_ips}}) except Exception: with excutils.save_and_reraise_exception(): # we need to try to undo the updated router @@ -557,10 +563,11 @@ class DVRResourceOperationHandler(object): # future with a compare-and-swap style update # using the revision number of the port. p = self._core_plugin.get_port(admin_ctx, port['id']) - rollback_fixed_ips = [ip for ip in p['fixed_ips'] - if ip['subnet_id'] != subnet['id']] - upd = {'port': {'fixed_ips': rollback_fixed_ips}} - self._core_plugin.update_port(admin_ctx, port['id'], upd) + fixed_ips = [ip for ip in p['fixed_ips'] + if ip['subnet_id'] != subnet['id']] + upd = {'port': {'fixed_ips': fixed_ips}} + self._core_plugin.update_port( + admin_ctx, port['id'], upd) try: revert() except Exception: @@ -569,8 +576,14 @@ class DVRResourceOperationHandler(object): port['id']) LOG.debug("CSNAT port updated for IPv6 subnet: %s", updated_port) - def _find_v6_router_port_by_network_and_device_owner(self, router, net_id, - device_owner): + @registry.receives(resources.ROUTER_INTERFACE, [events.AFTER_CREATE]) + def _update_snat_v6_addrs_after_intf_update(self, resource, event, trigger, + payload=None): + return self._retry_update_snat_v6_addrs_after_intf_update( + payload.context, payload) + + def _find_v6_router_port_by_network_and_device_owner( + self, router, net_id, device_owner): for port in router.attached_ports: p = port['port'] if (p['network_id'] == net_id and @@ -635,13 +648,11 @@ class DVRResourceOperationHandler(object): self.related_dvr_router_routers[cache_key] = ( existing_routers | other_routers) - @registry.receives(resources.ROUTER_INTERFACE, [events.AFTER_DELETE]) @db_api.retry_if_session_inactive() - def _cleanup_after_interface_removal(self, resource, event, trigger, - context, port, interface_info, - router_id, **kwargs): - """Handler to cleanup distributed resources after intf removal.""" - router = self.l3plugin._get_router(context, router_id) + def _retry_cleanup_after_interface_removal(self, context, payload): + context = payload.context + port = payload.metadata.get('port') + router = self.l3plugin._get_router(context, payload.resource_id) if not router.extra_attributes.distributed: return @@ -649,29 +660,31 @@ class DVRResourceOperationHandler(object): # the removed port's subnets and then subtract out any hosts still # hosting the router for the remaining interfaces router_hosts_for_removed = self.l3plugin._get_dvr_hosts_for_subnets( - context, subnet_ids={ip['subnet_id'] for ip in port['fixed_ips']}) + context, + subnet_ids={ip['subnet_id'] for ip in port['fixed_ips']}) router_hosts_after = self.l3plugin._get_dvr_hosts_for_router( - context, router_id) + context, payload.resource_id) removed_hosts = set(router_hosts_for_removed) - set(router_hosts_after) if removed_hosts: # Get hosts where this router is placed as "related" to other dvr # routers and don't remove it from such hosts - related_hosts = self._get_other_dvr_hosts(context, router_id) + related_hosts = self._get_other_dvr_hosts(context, + payload.resource_id) agents = self.l3plugin.get_l3_agents( context, filters={'host': removed_hosts}) bindings = rb_obj.RouterL3AgentBinding.get_objects( - context, router_id=router_id) + context, router_id=payload.resource_id) snat_binding = bindings.pop() if bindings else None connected_dvr_routers = set( self.l3plugin._get_other_dvr_router_ids_connected_router( - context, router_id)) + context, payload.resource_id)) for agent in agents: is_this_snat_agent = ( snat_binding and snat_binding.l3_agent_id == agent['id']) if (not is_this_snat_agent and agent['host'] not in related_hosts): self.l3plugin.l3_rpc_notifier.router_removed_from_agent( - context, router_id, agent['host']) + context, payload.resource_id, agent['host']) for connected_router_id in connected_dvr_routers: connected_router_hosts = set( self.l3plugin._get_dvr_hosts_for_router( @@ -680,17 +693,18 @@ class DVRResourceOperationHandler(object): self._get_other_dvr_hosts( context, connected_router_id)) if agent['host'] not in connected_router_hosts: - self.l3plugin.l3_rpc_notifier.\ + self.l3plugin.l3_rpc_notifier. \ router_removed_from_agent( context, connected_router_id, agent['host']) # if subnet_id not in interface_info, request was to remove by port + interface_info = payload.metadata.get('interface_info') sub_id = (interface_info.get('subnet_id') or port['fixed_ips'][0]['subnet_id']) self._cleanup_related_hosts_after_interface_removal( - context, router_id, sub_id) + context, payload.resource_id, sub_id) self._cleanup_related_routers_after_interface_removal( - context, router_id, sub_id) + context, payload.resource_id, sub_id) is_multiple_prefix_csport = ( self._check_for_multiprefix_csnat_port_and_update( context, router, port['network_id'], sub_id)) @@ -700,6 +714,13 @@ class DVRResourceOperationHandler(object): n_utils.get_elevated_context(context), router, subnet_id=sub_id) + @registry.receives(resources.ROUTER_INTERFACE, [events.AFTER_DELETE]) + def _cleanup_after_interface_removal(self, resource, event, trigger, + payload=None): + """Handler to cleanup distributed resources after intf removal.""" + return self._retry_cleanup_after_interface_removal( + payload.context, payload) + def _cleanup_related_hosts_after_interface_removal( self, context, router_id, subnet_id): router_hosts = self.l3plugin._get_dvr_hosts_for_router( diff --git a/neutron/tests/functional/services/l3_router/test_l3_dvr_router_plugin.py b/neutron/tests/functional/services/l3_router/test_l3_dvr_router_plugin.py index 74639edaa00..4596f2cd831 100644 --- a/neutron/tests/functional/services/l3_router/test_l3_dvr_router_plugin.py +++ b/neutron/tests/functional/services/l3_router/test_l3_dvr_router_plugin.py @@ -1970,27 +1970,22 @@ class L3DvrTestCase(L3DvrTestCaseBase): interface_info = {'subnet_id': subnet['subnet']['id']} self.l3_plugin.add_router_interface( self.context, router['id'], interface_info) - kwargs = {'context': self.context, 'router_id': router['id'], - 'network_id': net['network']['id'], - 'router_db': mock.ANY, - 'port': mock.ANY, - 'interface_info': interface_info} notif_handler_before.callback.assert_called_once_with( resources.ROUTER_INTERFACE, events.BEFORE_CREATE, - mock.ANY, **kwargs) - kwargs_after = {'cidrs': mock.ANY, - 'context': mock.ANY, - 'gateway_ips': mock.ANY, - 'interface_info': mock.ANY, - 'network_id': None, - 'port': mock.ANY, - 'new_interface': True, - 'subnets': mock.ANY, - 'port_id': mock.ANY, - 'router_id': router['id']} + mock.ANY, payload=mock.ANY) + payload = notif_handler_before.mock_calls[0][2]['payload'] + self.assertEqual(self.context, payload.context) + self.assertEqual(router['id'], payload.resource_id) + self.assertEqual(net['network']['id'], + payload.metadata.get('network_id')) + self.assertEqual(interface_info, + payload.metadata.get('interface_info')) + notif_handler_after.callback.assert_called_once_with( resources.ROUTER_INTERFACE, events.AFTER_CREATE, - mock.ANY, **kwargs_after) + mock.ANY, payload=mock.ANY) + payload = notif_handler_before.mock_calls[0][2]['payload'] + self.assertEqual(router['id'], payload.resource_id) def test_add_router_interface_by_port_notifications(self): notif_handler_before = mock.Mock() @@ -2007,28 +2002,26 @@ class L3DvrTestCase(L3DvrTestCaseBase): self.port(subnet=subnet) as port: interface_info = {'port_id': port['port']['id']} self.l3_plugin.add_router_interface( - self.context, router['id'], interface_info) - kwargs = {'context': self.context, 'router_id': router['id'], - 'network_id': net['network']['id'], - 'router_db': mock.ANY, - 'port': mock.ANY, - 'interface_info': interface_info} + self.context, router['id'], interface_info) + notif_handler_before.callback.assert_called_once_with( resources.ROUTER_INTERFACE, events.BEFORE_CREATE, - mock.ANY, **kwargs) - kwargs_after = {'cidrs': mock.ANY, - 'context': mock.ANY, - 'gateway_ips': mock.ANY, - 'interface_info': mock.ANY, - 'network_id': None, - 'port': mock.ANY, - 'new_interface': True, - 'subnets': mock.ANY, - 'port_id': port['port']['id'], - 'router_id': router['id']} + mock.ANY, payload=mock.ANY) + payload = notif_handler_before.mock_calls[0][2]['payload'] + self.assertEqual(interface_info, + payload.metadata.get('interface_info')) + self.assertEqual(net['network']['id'], + payload.metadata.get('network_id')) + self.assertEqual(router['id'], payload.resource_id) + self.assertEqual(self.context, payload.context) + notif_handler_after.callback.assert_called_once_with( resources.ROUTER_INTERFACE, events.AFTER_CREATE, - mock.ANY, **kwargs_after) + mock.ANY, payload=mock.ANY) + payload = notif_handler_after.mock_calls[0][2]['payload'] + self.assertEqual(port['port']['id'], + payload.metadata.get('port').get('id')) + self.assertEqual(router['id'], payload.resource_id) class L3DvrTestCaseMigration(L3DvrTestCaseBase): diff --git a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py index 327ad7ad961..2a975088682 100644 --- a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py +++ b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py @@ -238,19 +238,23 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase): expected_scheduling=0, expected_casts=0) def test__notify_agents_with_router_interface_add(self): + payload = events.DBEventPayload( + mock.Mock(), metadata={ + 'port': {'id': 'foo_port_id', + 'network_id': 'foo_network_id'}}) self._test__notify_agents_with_function( lambda: self.notifier._after_router_interface_created( - mock.ANY, mock.ANY, mock.ANY, context=mock.Mock(), - port={'id': 'foo_port_id', 'network_id': 'foo_network_id'}), + mock.ANY, mock.ANY, mock.ANY, payload=payload), expected_scheduling=1, expected_casts=1) def test__notify_agents_with_router_interface_delete(self): + payload = events.DBEventPayload( + mock.Mock(), metadata={ + 'port': {'id': 'foo_port_id', + 'network_id': 'foo_network_id'}}) self._test__notify_agents_with_function( lambda: self.notifier._after_router_interface_deleted( - mock.ANY, mock.ANY, mock.ANY, context=mock.Mock(), - port={'id': 'foo_port_id', 'network_id': 'foo_network_id', - 'fixed_ips': {'subnet_id': 'subnet1', - 'ip_address': '10.0.0.1'}}), + mock.ANY, mock.ANY, mock.ANY, payload=payload), expected_scheduling=0, expected_casts=1) def test__fanout_message(self): diff --git a/neutron/tests/unit/db/test_l3_db.py b/neutron/tests/unit/db/test_l3_db.py index 70cb49320dd..063f3de30f1 100644 --- a/neutron/tests/unit/db/test_l3_db.py +++ b/neutron/tests/unit/db/test_l3_db.py @@ -275,7 +275,7 @@ class TestL3_NAT_dbonly_mixin( mock.ANY, fip, floatingip_obj) def test__notify_attaching_interface(self): - with mock.patch.object(l3_db.registry, 'notify') as mock_notify: + with mock.patch.object(l3_db.registry, 'publish') as mock_notify: context = mock.MagicMock() router_id = 'router_id' net_id = 'net_id' @@ -284,12 +284,17 @@ class TestL3_NAT_dbonly_mixin( port = {'network_id': net_id} intf = {} self.db._notify_attaching_interface(context, router_db, port, intf) - kwargs = {'context': context, 'router_id': router_id, - 'network_id': net_id, 'interface_info': intf, - 'router_db': router_db, 'port': port} + mock_notify.assert_called_once_with( resources.ROUTER_INTERFACE, events.BEFORE_CREATE, self.db, - **kwargs) + payload=mock.ANY) + payload = mock_notify.mock_calls[0][2]['payload'] + self.assertEqual(context, payload.context) + self.assertEqual(router_id, payload.resource_id) + self.assertEqual(net_id, payload.metadata.get('network_id')) + self.assertEqual(intf, payload.metadata.get('interface_info')) + self.assertEqual(router_db, payload.latest_state) + self.assertEqual(port, payload.metadata.get('port')) def test__create_gw_port(self): # NOTE(slaweq): this test is probably wrong diff --git a/neutron/tests/unit/extensions/test_l3.py b/neutron/tests/unit/extensions/test_l3.py index 28761c14d4b..8fbfe2fb66f 100644 --- a/neutron/tests/unit/extensions/test_l3.py +++ b/neutron/tests/unit/extensions/test_l3.py @@ -2113,7 +2113,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): def test_router_remove_interface_callback_failure_returns_409(self): with self.router() as r,\ self.subnet() as s,\ - mock.patch.object(registry, 'notify') as notify: + mock.patch.object(registry, 'publish') as notify: errors = [ exceptions.NotificationError( 'foo_callback_id', n_exc.InUse()), @@ -2138,7 +2138,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): def test_router_clear_gateway_callback_failure_returns_409(self): with self.router() as r,\ self.subnet() as s,\ - mock.patch.object(registry, 'notify') as notify: + mock.patch.object(registry, 'publish') as notify: errors = [ exceptions.NotificationError( 'foo_callback_id', n_exc.InUse()),