From 3f3874717c07e2b469ea6c6fd52bcb4da7b380c7 Mon Sep 17 00:00:00 2001 From: armando-migliaccio Date: Thu, 5 Feb 2015 08:40:04 -0800 Subject: [PATCH] Break coupling between ML2 and L3 during create/update operations This is an initial patch in a series that, by using an event framework, cleans up the relationship between ML2 and L3, so that they are no longer tightly coupled. A follow-up will take address the coupling during the port delete operation. The newly introduced notification hooks not only benefit the L3 service plugin, but any other plugin that has an interest in knowing about port events. Long term, the notification bits can move in a more 'common' place so that other plugins can take advantage of them, but as mentioned in a parent patch, the perestroika is not quite there yet. Related-blueprint: services-split Related-blueprint: plugin-interface-perestroika Change-Id: I6527b1cb53a71a1f68329a7a3b1878094558f8c2 --- neutron/db/l3_dvrscheduler_db.py | 29 ++++++++++++ neutron/plugins/ml2/plugin.py | 40 ++++++---------- neutron/plugins/ml2/rpc.py | 30 +++++++----- .../services/l3_router/l3_router_plugin.py | 2 + neutron/tests/unit/ml2/test_ml2_plugin.py | 34 ++++++-------- neutron/tests/unit/ml2/test_rpcapi.py | 47 +++++++------------ 6 files changed, 96 insertions(+), 86 deletions(-) diff --git a/neutron/db/l3_dvrscheduler_db.py b/neutron/db/l3_dvrscheduler_db.py index e367381be53..e67c97f631b 100644 --- a/neutron/db/l3_dvrscheduler_db.py +++ b/neutron/db/l3_dvrscheduler_db.py @@ -22,6 +22,9 @@ from sqlalchemy import orm from sqlalchemy.orm import exc from sqlalchemy.orm import joinedload +from neutron.callbacks import events +from neutron.callbacks import registry +from neutron.callbacks import resources from neutron.common import constants as q_const from neutron.common import utils as n_utils from neutron.db import agents_db @@ -29,6 +32,8 @@ from neutron.db import l3_agentschedulers_db as l3agent_sch_db from neutron.db import model_base from neutron.db import models_v2 from neutron.i18n import _LI, _LW +from neutron import manager +from neutron.plugins.common import constants as service_constants from neutron.plugins.ml2 import db as ml2_db LOG = logging.getLogger(__name__) @@ -309,3 +314,27 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin): self.bind_dvr_router_servicenode( context, router_id, chosen_agent) return chosen_agent + + +def _notify_l3_agent_new_port(resource, event, trigger, **kwargs): + LOG.debug('Received %s %s' % (resource, event)) + port = kwargs['port'] + if not port: + return + + l3plugin = manager.NeutronManager.get_service_plugins().get( + service_constants.L3_ROUTER_NAT) + mac_address_updated = kwargs.get('mac_address_updated') + update_device_up = kwargs.get('update_device_up') + context = kwargs['context'] + if mac_address_updated or update_device_up: + l3plugin.dvr_vmarp_table_update(context, port, "add") + if n_utils.is_dvr_serviced(port['device_owner']): + l3plugin.dvr_update_router_addvm(context, port) + + +def subscribe(): + registry.subscribe( + _notify_l3_agent_new_port, resources.PORT, events.AFTER_UPDATE) + registry.subscribe( + _notify_l3_agent_new_port, resources.PORT, events.AFTER_CREATE) diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 80c0f053a43..99679c440fe 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -33,6 +33,9 @@ from neutron.api.rpc.handlers import dvr_rpc from neutron.api.rpc.handlers import metadata_rpc from neutron.api.rpc.handlers import securitygroups_rpc from neutron.api.v2 import attributes +from neutron.callbacks import events +from neutron.callbacks import registry +from neutron.callbacks import resources from neutron.common import constants as const from neutron.common import exceptions as exc from neutron.common import ipv6_utils @@ -161,20 +164,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if self.type_manager.network_matches_filters(network, filters) ] - def _notify_l3_agent_new_port(self, context, port): - if not port: - return - - # Whenever a DVR serviceable port comes up on a - # node, it has to be communicated to the L3 Plugin - # and agent for creating the respective namespaces. - if (utils.is_dvr_serviced(port['device_owner'])): - l3plugin = manager.NeutronManager.get_service_plugins().get( - service_constants.L3_ROUTER_NAT) - if (utils.is_extension_supported( - l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)): - l3plugin.dvr_update_router_addvm(context, port) - def _get_host_port_if_changed(self, mech_context, attrs): binding = mech_context._binding host = attrs and attrs.get(portbindings.HOST_ID) @@ -936,7 +925,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, attrs = port[attributes.PORT] result, mech_context = self._create_port_db(context, port) new_host_port = self._get_host_port_if_changed(mech_context, attrs) - self._notify_l3_agent_new_port(context, new_host_port) + # notify any plugin that is interested in port create events + kwargs = {'context': context, 'port': new_host_port} + registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs) try: self.mechanism_manager.create_port_postcommit(mech_context) @@ -972,7 +963,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if attrs and attrs.get(portbindings.HOST_ID): new_host_port = self._get_host_port_if_changed( obj['mech_context'], attrs) - self._notify_l3_agent_new_port(context, new_host_port) + kwargs = {'context': context, 'port': new_host_port} + registry.notify( + resources.PORT, events.AFTER_CREATE, self, **kwargs) try: for obj in objects: @@ -990,11 +983,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, def update_port(self, context, id, port): attrs = port[attributes.PORT] need_port_update_notify = False - l3plugin = manager.NeutronManager.get_service_plugins().get( - service_constants.L3_ROUTER_NAT) - is_dvr_enabled = utils.is_extension_supported( - l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS) - session = context.session # REVISIT: Serialize this operation with a semaphore to @@ -1034,10 +1022,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.mechanism_manager.update_port_precommit(mech_context) # Notifications must be sent after the above transaction is complete - if mac_address_updated and l3plugin and is_dvr_enabled: - # NOTE: "add" actually does a 'replace' operation - l3plugin.dvr_vmarp_table_update(context, updated_port, "add") - self._notify_l3_agent_new_port(context, new_host_port) + kwargs = { + 'context': context, + 'port': new_host_port, + 'mac_address_updated': mac_address_updated, + } + registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs) # TODO(apech) - handle errors raised by update_port, potentially # by re-calling update_port with the previous attributes. For diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 8e1a55d9c76..d46e7958915 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -19,15 +19,16 @@ from sqlalchemy.orm import exc from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.handlers import dvr_rpc +from neutron.callbacks import events +from neutron.callbacks import registry +from neutron.callbacks import resources from neutron.common import constants as q_const from neutron.common import exceptions from neutron.common import rpc as n_rpc from neutron.common import topics -from neutron.common import utils from neutron.extensions import portbindings from neutron.i18n import _LW from neutron import manager -from neutron.plugins.common import constants as service_constants from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2.drivers import type_tunnel # REVISIT(kmestery): Allow the type and mechanism drivers to supply the @@ -167,16 +168,21 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): port_id = plugin.update_port_status(rpc_context, port_id, q_const.PORT_STATUS_ACTIVE, host) - l3plugin = manager.NeutronManager.get_service_plugins().get( - service_constants.L3_ROUTER_NAT) - if (l3plugin and - utils.is_extension_supported(l3plugin, - q_const.L3_DISTRIBUTED_EXT_ALIAS)): - try: - port = plugin._get_port(rpc_context, port_id) - l3plugin.dvr_vmarp_table_update(rpc_context, port, "add") - except exceptions.PortNotFound: - LOG.debug('Port %s not found during ARP update', port_id) + try: + # NOTE(armax): it's best to remove all objects from the + # session, before we try to retrieve the new port object + rpc_context.session.expunge_all() + port = plugin._get_port(rpc_context, port_id) + except exceptions.PortNotFound: + LOG.debug('Port %s not found during update', port_id) + else: + kwargs = { + 'context': rpc_context, + 'port': port, + 'update_device_up': True + } + registry.notify( + resources.PORT, events.AFTER_UPDATE, plugin, **kwargs) class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index 57faf6028c2..85ad1f86b54 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -56,6 +56,8 @@ class L3RouterPlugin(common_db_mixin.CommonDbMixin, cfg.CONF.router_scheduler_driver) self.start_periodic_l3_agent_status_check() super(L3RouterPlugin, self).__init__() + if 'dvr' in self.supported_extension_aliases: + l3_dvrscheduler_db.subscribe() def setup_rpc(self): # RPC support diff --git a/neutron/tests/unit/ml2/test_ml2_plugin.py b/neutron/tests/unit/ml2/test_ml2_plugin.py index 2ae3f7476bf..3e87842a702 100644 --- a/neutron/tests/unit/ml2/test_ml2_plugin.py +++ b/neutron/tests/unit/ml2/test_ml2_plugin.py @@ -1251,6 +1251,8 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase): def setUp(self): super(TestMl2PluginCreateUpdateDeletePort, self).setUp() self.context = mock.MagicMock() + self.notify_p = mock.patch('neutron.callbacks.registry.notify') + self.notify = self.notify_p.start() def _ensure_transaction_is_closed(self): transaction = self.context.session.begin(subtransactions=True) @@ -1268,9 +1270,8 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase): return_value=new_host_port) plugin._check_mac_update_allowed = mock.Mock(return_value=True) - plugin._notify_l3_agent_new_port = mock.Mock() - plugin._notify_l3_agent_new_port.side_effect = ( - lambda c, p: self._ensure_transaction_is_closed()) + self.notify.side_effect = ( + lambda r, e, t, **kwargs: self._ensure_transaction_is_closed()) return plugin @@ -1286,33 +1287,28 @@ class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase): plugin.create_port(self.context, mock.MagicMock()) - plugin._notify_l3_agent_new_port.assert_called_once_with( - self.context, new_host_port) + kwargs = {'context': self.context, 'port': new_host_port} + self.notify.assert_called_once_with('port', 'after_create', + plugin, **kwargs) def test_update_port_rpc_outside_transaction(self): with contextlib.nested( mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'), mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'), - mock.patch.object(manager.NeutronManager, 'get_service_plugins'), - ) as (init, super_update_port, get_service_plugins): + ) as (init, super_update_port): init.return_value = None - l3plugin = mock.Mock() - l3plugin.supported_extension_aliases = [ - constants.L3_DISTRIBUTED_EXT_ALIAS, - ] - get_service_plugins.return_value = { - service_constants.L3_ROUTER_NAT: l3plugin, - } - new_host_port = mock.Mock() plugin = self._create_plugin_for_create_update_port(new_host_port) plugin.update_port(self.context, 'fake_id', mock.MagicMock()) - plugin._notify_l3_agent_new_port.assert_called_once_with( - self.context, new_host_port) - l3plugin.dvr_vmarp_table_update.assert_called_once_with( - self.context, mock.ANY, "add") + kwargs = { + 'context': self.context, + 'port': new_host_port, + 'mac_address_updated': True, + } + self.notify.assert_called_once_with('port', 'after_update', + plugin, **kwargs) def test_vmarp_table_update_outside_of_delete_transaction(self): l3plugin = mock.Mock() diff --git a/neutron/tests/unit/ml2/test_rpcapi.py b/neutron/tests/unit/ml2/test_rpcapi.py index efb1dbd5784..a22445cafa0 100644 --- a/neutron/tests/unit/ml2/test_rpcapi.py +++ b/neutron/tests/unit/ml2/test_rpcapi.py @@ -44,45 +44,32 @@ class RpcCallbacksTestCase(base.BaseTestCase): self.type_manager) self.manager = mock.patch.object( plugin_rpc.manager, 'NeutronManager').start() - self.l3plugin = mock.Mock() - self.manager.get_service_plugins.return_value = { - 'L3_ROUTER_NAT': self.l3plugin - } self.plugin = self.manager.get_plugin() - def _test_update_device_up(self, extensions, kwargs): + def _test_update_device_up(self): + kwargs = { + 'agent_id': 'foo_agent', + 'device': 'foo_device' + } with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin' '._device_to_port_id'): - type(self.l3plugin).supported_extension_aliases = ( - mock.PropertyMock(return_value=extensions)) - self.callbacks.update_device_up(mock.ANY, **kwargs) + with mock.patch('neutron.callbacks.registry.notify') as notify: + self.callbacks.update_device_up(mock.Mock(), **kwargs) + return notify - def test_update_device_up_without_dvr(self): + def test_update_device_up_notify(self): + notify = self._test_update_device_up() kwargs = { - 'agent_id': 'foo_agent', - 'device': 'foo_device' + 'context': mock.ANY, 'port': mock.ANY, 'update_device_up': True } - self._test_update_device_up(['router'], kwargs) - self.assertFalse(self.l3plugin.dvr_vmarp_table_update.call_count) + notify.assert_called_once_with( + 'port', 'after_update', self.plugin, **kwargs) - def test_update_device_up_with_dvr(self): - kwargs = { - 'agent_id': 'foo_agent', - 'device': 'foo_device' - } - self._test_update_device_up(['router', 'dvr'], kwargs) - self.l3plugin.dvr_vmarp_table_update.assert_called_once_with( - mock.ANY, mock.ANY, 'add') - - def test_update_device_up_with_dvr_when_port_not_found(self): - kwargs = { - 'agent_id': 'foo_agent', - 'device': 'foo_device' - } - self.l3plugin.dvr_vmarp_table_update.side_effect = ( + def test_update_device_up_notify_not_sent_with_port_not_found(self): + self.plugin._get_port.side_effect = ( exceptions.PortNotFound(port_id='foo_port_id')) - self._test_update_device_up(['router', 'dvr'], kwargs) - self.assertTrue(self.l3plugin.dvr_vmarp_table_update.call_count) + notify = self._test_update_device_up() + self.assertFalse(notify.call_count) def test_get_device_details_without_port_context(self): self.plugin.get_bound_port_context.return_value = None