diff --git a/neutron/db/l3_dvrscheduler_db.py b/neutron/db/l3_dvrscheduler_db.py index e367381be53..e67c97f631b 100644 --- a/neutron/db/l3_dvrscheduler_db.py +++ b/neutron/db/l3_dvrscheduler_db.py @@ -22,6 +22,9 @@ from sqlalchemy import orm from sqlalchemy.orm import exc from sqlalchemy.orm import joinedload +from neutron.callbacks import events +from neutron.callbacks import registry +from neutron.callbacks import resources from neutron.common import constants as q_const from neutron.common import utils as n_utils from neutron.db import agents_db @@ -29,6 +32,8 @@ from neutron.db import l3_agentschedulers_db as l3agent_sch_db from neutron.db import model_base from neutron.db import models_v2 from neutron.i18n import _LI, _LW +from neutron import manager +from neutron.plugins.common import constants as service_constants from neutron.plugins.ml2 import db as ml2_db LOG = logging.getLogger(__name__) @@ -309,3 +314,27 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin): self.bind_dvr_router_servicenode( context, router_id, chosen_agent) return chosen_agent + + +def _notify_l3_agent_new_port(resource, event, trigger, **kwargs): + LOG.debug('Received %s %s' % (resource, event)) + port = kwargs['port'] + if not port: + return + + l3plugin = manager.NeutronManager.get_service_plugins().get( + service_constants.L3_ROUTER_NAT) + mac_address_updated = kwargs.get('mac_address_updated') + update_device_up = kwargs.get('update_device_up') + context = kwargs['context'] + if mac_address_updated or update_device_up: + l3plugin.dvr_vmarp_table_update(context, port, "add") + if n_utils.is_dvr_serviced(port['device_owner']): + l3plugin.dvr_update_router_addvm(context, port) + + +def subscribe(): + registry.subscribe( + _notify_l3_agent_new_port, resources.PORT, events.AFTER_UPDATE) + registry.subscribe( + _notify_l3_agent_new_port, resources.PORT, events.AFTER_CREATE) diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 1faacf5a256..aa5620e0421 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -34,6 +34,9 @@ from neutron.api.rpc.handlers import dvr_rpc from neutron.api.rpc.handlers import metadata_rpc from neutron.api.rpc.handlers import securitygroups_rpc from neutron.api.v2 import attributes +from neutron.callbacks import events +from neutron.callbacks import registry +from neutron.callbacks import resources from neutron.common import constants as const from neutron.common import exceptions as exc from neutron.common import ipv6_utils @@ -162,20 +165,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if self.type_manager.network_matches_filters(network, filters) ] - def _notify_l3_agent_new_port(self, context, port): - if not port: - return - - # Whenever a DVR serviceable port comes up on a - # node, it has to be communicated to the L3 Plugin - # and agent for creating the respective namespaces. - if (utils.is_dvr_serviced(port['device_owner'])): - l3plugin = manager.NeutronManager.get_service_plugins().get( - service_constants.L3_ROUTER_NAT) - if (utils.is_extension_supported( - l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)): - l3plugin.dvr_update_router_addvm(context, port) - def _get_host_port_if_changed(self, mech_context, attrs): binding = mech_context._binding host = attrs and attrs.get(portbindings.HOST_ID) @@ -943,7 +932,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, attrs = port[attributes.PORT] result, mech_context = self._create_port_db(context, port) new_host_port = self._get_host_port_if_changed(mech_context, attrs) - self._notify_l3_agent_new_port(context, new_host_port) + # notify any plugin that is interested in port create events + kwargs = {'context': context, 'port': new_host_port} + registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs) try: self.mechanism_manager.create_port_postcommit(mech_context) @@ -979,7 +970,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if attrs and attrs.get(portbindings.HOST_ID): new_host_port = self._get_host_port_if_changed( obj['mech_context'], attrs) - self._notify_l3_agent_new_port(context, new_host_port) + kwargs = {'context': context, 'port': new_host_port} + registry.notify( + resources.PORT, events.AFTER_CREATE, self, **kwargs) try: for obj in objects: @@ -997,11 +990,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, def update_port(self, context, id, port): attrs = port[attributes.PORT] need_port_update_notify = False - l3plugin = manager.NeutronManager.get_service_plugins().get( - service_constants.L3_ROUTER_NAT) - is_dvr_enabled = utils.is_extension_supported( - l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS) - session = context.session # REVISIT: Serialize this operation with a semaphore to @@ -1041,10 +1029,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.mechanism_manager.update_port_precommit(mech_context) # Notifications must be sent after the above transaction is complete - if mac_address_updated and l3plugin and is_dvr_enabled: - # NOTE: "add" actually does a 'replace' operation - l3plugin.dvr_vmarp_table_update(context, updated_port, "add") - self._notify_l3_agent_new_port(context, new_host_port) + kwargs = { + 'context': context, + 'port': new_host_port, + 'mac_address_updated': mac_address_updated, + } + registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs) # TODO(apech) - handle errors raised by update_port, potentially # by re-calling update_port with the previous attributes. For diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 16a26704a9c..c4ec6a61f5d 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -19,15 +19,16 @@ from sqlalchemy.orm import exc from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.handlers import dvr_rpc +from neutron.callbacks import events +from neutron.callbacks import registry +from neutron.callbacks import resources from neutron.common import constants as q_const from neutron.common import exceptions from neutron.common import rpc as n_rpc from neutron.common import topics -from neutron.common import utils from neutron.extensions import portbindings from neutron.i18n import _LW from neutron import manager -from neutron.plugins.common import constants as service_constants from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2.drivers import type_tunnel # REVISIT(kmestery): Allow the type and mechanism drivers to supply the @@ -167,16 +168,21 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): port_id = plugin.update_port_status(rpc_context, port_id, q_const.PORT_STATUS_ACTIVE, host) - l3plugin = manager.NeutronManager.get_service_plugins().get( - service_constants.L3_ROUTER_NAT) - if (l3plugin and - utils.is_extension_supported(l3plugin, - q_const.L3_DISTRIBUTED_EXT_ALIAS)): - try: - port = plugin._get_port(rpc_context, port_id) - l3plugin.dvr_vmarp_table_update(rpc_context, port, "add") - except exceptions.PortNotFound: - LOG.debug('Port %s not found during ARP update', port_id) + try: + # NOTE(armax): it's best to remove all objects from the + # session, before we try to retrieve the new port object + rpc_context.session.expunge_all() + port = plugin._get_port(rpc_context, port_id) + except exceptions.PortNotFound: + LOG.debug('Port %s not found during update', port_id) + else: + kwargs = { + 'context': rpc_context, + 'port': port, + 'update_device_up': True + } + registry.notify( + resources.PORT, events.AFTER_UPDATE, plugin, **kwargs) class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index 57faf6028c2..85ad1f86b54 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -56,6 +56,8 @@ class L3RouterPlugin(common_db_mixin.CommonDbMixin, cfg.CONF.router_scheduler_driver) self.start_periodic_l3_agent_status_check() super(L3RouterPlugin, self).__init__() + if 'dvr' in self.supported_extension_aliases: + l3_dvrscheduler_db.subscribe() def setup_rpc(self): # RPC support diff --git a/neutron/tests/unit/ml2/test_ml2_plugin.py b/neutron/tests/unit/ml2/test_ml2_plugin.py index 6bb263bd3d5..bb12b3893dd 100644 --- a/neutron/tests/unit/ml2/test_ml2_plugin.py +++ b/neutron/tests/unit/ml2/test_ml2_plugin.py @@ -1267,6 +1267,8 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase): def setUp(self): super(TestMl2PluginCreateUpdateDeletePort, self).setUp() self.context = mock.MagicMock() + self.notify_p = mock.patch('neutron.callbacks.registry.notify') + self.notify = self.notify_p.start() def _ensure_transaction_is_closed(self): transaction = self.context.session.begin(subtransactions=True) @@ -1284,9 +1286,8 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase): return_value=new_host_port) plugin._check_mac_update_allowed = mock.Mock(return_value=True) - plugin._notify_l3_agent_new_port = mock.Mock() - plugin._notify_l3_agent_new_port.side_effect = ( - lambda c, p: self._ensure_transaction_is_closed()) + self.notify.side_effect = ( + lambda r, e, t, **kwargs: self._ensure_transaction_is_closed()) return plugin @@ -1302,33 +1303,28 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase): plugin.create_port(self.context, mock.MagicMock()) - plugin._notify_l3_agent_new_port.assert_called_once_with( - self.context, new_host_port) + kwargs = {'context': self.context, 'port': new_host_port} + self.notify.assert_called_once_with('port', 'after_create', + plugin, **kwargs) def test_update_port_rpc_outside_transaction(self): with contextlib.nested( mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'), mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'), - mock.patch.object(manager.NeutronManager, 'get_service_plugins'), - ) as (init, super_update_port, get_service_plugins): + ) as (init, super_update_port): init.return_value = None - l3plugin = mock.Mock() - l3plugin.supported_extension_aliases = [ - constants.L3_DISTRIBUTED_EXT_ALIAS, - ] - get_service_plugins.return_value = { - service_constants.L3_ROUTER_NAT: l3plugin, - } - new_host_port = mock.Mock() plugin = self._create_plugin_for_create_update_port(new_host_port) plugin.update_port(self.context, 'fake_id', mock.MagicMock()) - plugin._notify_l3_agent_new_port.assert_called_once_with( - self.context, new_host_port) - l3plugin.dvr_vmarp_table_update.assert_called_once_with( - self.context, mock.ANY, "add") + kwargs = { + 'context': self.context, + 'port': new_host_port, + 'mac_address_updated': True, + } + self.notify.assert_called_once_with('port', 'after_update', + plugin, **kwargs) def test_vmarp_table_update_outside_of_delete_transaction(self): l3plugin = mock.Mock() diff --git a/neutron/tests/unit/ml2/test_rpcapi.py b/neutron/tests/unit/ml2/test_rpcapi.py index 0a4c0453f5b..2dfa71bca8a 100644 --- a/neutron/tests/unit/ml2/test_rpcapi.py +++ b/neutron/tests/unit/ml2/test_rpcapi.py @@ -44,45 +44,32 @@ class RpcCallbacksTestCase(base.BaseTestCase): self.type_manager) self.manager = mock.patch.object( plugin_rpc.manager, 'NeutronManager').start() - self.l3plugin = mock.Mock() - self.manager.get_service_plugins.return_value = { - 'L3_ROUTER_NAT': self.l3plugin - } self.plugin = self.manager.get_plugin() - def _test_update_device_up(self, extensions, kwargs): - with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin' - '._device_to_port_id'): - type(self.l3plugin).supported_extension_aliases = ( - mock.PropertyMock(return_value=extensions)) - self.callbacks.update_device_up(mock.ANY, **kwargs) - - def test_update_device_up_without_dvr(self): + def _test_update_device_up(self): kwargs = { 'agent_id': 'foo_agent', 'device': 'foo_device' } - self._test_update_device_up(['router'], kwargs) - self.assertFalse(self.l3plugin.dvr_vmarp_table_update.call_count) + with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin' + '._device_to_port_id'): + with mock.patch('neutron.callbacks.registry.notify') as notify: + self.callbacks.update_device_up(mock.Mock(), **kwargs) + return notify - def test_update_device_up_with_dvr(self): + def test_update_device_up_notify(self): + notify = self._test_update_device_up() kwargs = { - 'agent_id': 'foo_agent', - 'device': 'foo_device' + 'context': mock.ANY, 'port': mock.ANY, 'update_device_up': True } - self._test_update_device_up(['router', 'dvr'], kwargs) - self.l3plugin.dvr_vmarp_table_update.assert_called_once_with( - mock.ANY, mock.ANY, 'add') + notify.assert_called_once_with( + 'port', 'after_update', self.plugin, **kwargs) - def test_update_device_up_with_dvr_when_port_not_found(self): - kwargs = { - 'agent_id': 'foo_agent', - 'device': 'foo_device' - } - self.l3plugin.dvr_vmarp_table_update.side_effect = ( + def test_update_device_up_notify_not_sent_with_port_not_found(self): + self.plugin._get_port.side_effect = ( exceptions.PortNotFound(port_id='foo_port_id')) - self._test_update_device_up(['router', 'dvr'], kwargs) - self.assertTrue(self.l3plugin.dvr_vmarp_table_update.call_count) + notify = self._test_update_device_up() + self.assertFalse(notify.call_count) def test_get_device_details_without_port_context(self): self.plugin.get_bound_port_context.return_value = None