From 411836b5411411a6046043e0264aaa7b6f5760f0 Mon Sep 17 00:00:00 2001 From: Carl Baldwin Date: Fri, 19 Sep 2014 17:37:17 +0000 Subject: [PATCH] Remove RPC notification from transaction in create/update port Removing notifications to the L3 agent from within the transaction in create_port and update_port eliminates many lock wait timeouts in the dvr check queue job and in scale testing locally. Since this patch leaves context unused in _process_port_binding, the argument is removed from the method. Closes-Bug: #1371732 Change-Id: Ibd86611ad3e7eff085d769bdff777a5870f30c58 --- neutron/plugins/ml2/plugin.py | 43 ++++++++++++----- neutron/tests/unit/ml2/test_ml2_plugin.py | 59 +++++++++++++++++++++++ 2 files changed, 90 insertions(+), 12 deletions(-) diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index e6a8d9040..c1e1627b6 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -159,7 +159,27 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, # TODO(rkukura): Implement filtering. return nets - def _process_port_binding(self, mech_context, context, attrs): + def _notify_l3_agent_new_port(self, context, port): + if not port: + return + + # Whenever a DVR serviceable port comes up on a + # node, it has to be communicated to the L3 Plugin + # and agent for creating the respective namespaces. + if (utils.is_dvr_serviced(port['device_owner'])): + l3plugin = manager.NeutronManager.get_service_plugins().get( + service_constants.L3_ROUTER_NAT) + if (utils.is_extension_supported( + l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)): + l3plugin.dvr_update_router_addvm(context, port) + + def _get_host_port_if_changed(self, mech_context, attrs): + binding = mech_context._binding + host = attrs and attrs.get(portbindings.HOST_ID) + if (attributes.is_attr_set(host) and binding.host != host): + return mech_context.current + + def _process_port_binding(self, mech_context, attrs): binding = mech_context._binding port = mech_context.current changes = False @@ -169,15 +189,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, binding.host != host): binding.host = host changes = True - # Whenever a DVR serviceable port comes up on a - # node, it has to be communicated to the L3 Plugin - # and agent for creating the respective namespaces. - if (utils.is_dvr_serviced(port['device_owner'])): - l3plugin = manager.NeutronManager.get_service_plugins().get( - service_constants.L3_ROUTER_NAT) - if (utils.is_extension_supported( - l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)): - l3plugin.dvr_update_router_addvm(context, port) vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE) if (attributes.is_attr_set(vnic_type) and @@ -770,7 +781,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, binding = db.add_port_binding(session, result['id']) mech_context = driver_context.PortContext(self, context, result, network, binding) - self._process_port_binding(mech_context, context, attrs) + new_host_port = self._get_host_port_if_changed(mech_context, attrs) + self._process_port_binding(mech_context, attrs) result[addr_pair.ADDRESS_PAIRS] = ( self._process_create_allowed_address_pairs( @@ -780,6 +792,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, dhcp_opts) self.mechanism_manager.create_port_precommit(mech_context) + # Notification must be sent after the above transaction is complete + self._notify_l3_agent_new_port(context, new_host_port) + try: self.mechanism_manager.create_port_postcommit(mech_context) except ml2_exc.MechanismDriverError: @@ -834,10 +849,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, mech_context = driver_context.PortContext( self, context, updated_port, network, binding, original_port=original_port) + new_host_port = self._get_host_port_if_changed(mech_context, attrs) need_port_update_notify |= self._process_port_binding( - mech_context, context, attrs) + mech_context, attrs) self.mechanism_manager.update_port_precommit(mech_context) + # Notification must be sent after the above transaction is complete + self._notify_l3_agent_new_port(context, new_host_port) + # TODO(apech) - handle errors raised by update_port, potentially # by re-calling update_port with the previous attributes. For # now the error is propogated to the caller, which is expected to diff --git a/neutron/tests/unit/ml2/test_ml2_plugin.py b/neutron/tests/unit/ml2/test_ml2_plugin.py index ca0a061f9..1996979ab 100644 --- a/neutron/tests/unit/ml2/test_ml2_plugin.py +++ b/neutron/tests/unit/ml2/test_ml2_plugin.py @@ -23,6 +23,7 @@ from neutron.common import constants from neutron.common import exceptions as exc from neutron.common import utils from neutron import context +from neutron.db import db_base_plugin_v2 as base_plugin from neutron.extensions import multiprovidernet as mpnet from neutron.extensions import portbindings from neutron.extensions import providernet as pnet @@ -36,6 +37,7 @@ from neutron.plugins.ml2 import driver_context from neutron.plugins.ml2.drivers import type_vlan from neutron.plugins.ml2 import models from neutron.plugins.ml2 import plugin as ml2_plugin +from neutron.tests import base from neutron.tests.unit import _test_extension_portbindings as test_bindings from neutron.tests.unit.ml2.drivers import mechanism_logger as mech_logger from neutron.tests.unit.ml2.drivers import mechanism_test as mech_test @@ -940,3 +942,60 @@ class TestFaultyMechansimDriver(Ml2PluginV2FaultyDriverTestCase): self.assertEqual(new_name, port['port']['name']) self._delete('ports', port['port']['id']) + + +class TestMl2PluginCreateUpdatePort(base.BaseTestCase): + def setUp(self): + super(TestMl2PluginCreateUpdatePort, self).setUp() + self.context = mock.MagicMock() + + def _ensure_transaction_is_closed(self): + transaction = self.context.session.begin(subtransactions=True) + enter = transaction.__enter__.call_count + exit = transaction.__exit__.call_count + self.assertEqual(enter, exit) + + def _create_plugin_for_create_update_port(self, new_host_port): + plugin = ml2_plugin.Ml2Plugin() + plugin.extension_manager = mock.Mock() + plugin.type_manager = mock.Mock() + plugin.mechanism_manager = mock.Mock() + plugin.notifier = mock.Mock() + plugin._get_host_port_if_changed = mock.Mock( + return_value=new_host_port) + + plugin._notify_l3_agent_new_port = mock.Mock() + plugin._notify_l3_agent_new_port.side_effect = ( + lambda c, p: self._ensure_transaction_is_closed()) + + return plugin + + def test_create_port_rpc_outside_transaction(self): + with contextlib.nested( + mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'), + mock.patch.object(base_plugin.NeutronDbPluginV2, 'create_port'), + ) as (init, super_create_port): + init.return_value = None + + new_host_port = mock.Mock() + plugin = self._create_plugin_for_create_update_port(new_host_port) + + plugin.create_port(self.context, mock.MagicMock()) + + plugin._notify_l3_agent_new_port.assert_called_once_with( + self.context, new_host_port) + + def test_update_port_rpc_outside_transaction(self): + with contextlib.nested( + mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'), + mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'), + ) as (init, super_update_port): + init.return_value = None + + new_host_port = mock.Mock() + plugin = self._create_plugin_for_create_update_port(new_host_port) + + plugin.update_port(self.context, 'fake_id', mock.MagicMock()) + + plugin._notify_l3_agent_new_port.assert_called_once_with( + self.context, new_host_port)