Remove RPC notification from transaction in create/update port

Removing notifications to the L3 agent from within the transaction in
create_port and update_port eliminates many lock wait timeouts in the
dvr check queue job and in scale testing locally.

Since this patch leaves context unused in _process_port_binding, the
argument is removed from the method.

Closes-Bug: #1371732

Change-Id: Ibd86611ad3e7eff085d769bdff777a5870f30c58
This commit is contained in:
Carl Baldwin 2014-09-19 17:37:17 +00:00
parent 6090d4d777
commit 411836b541
2 changed files with 90 additions and 12 deletions

View File

@ -159,7 +159,27 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# TODO(rkukura): Implement filtering. # TODO(rkukura): Implement filtering.
return nets return nets
def _process_port_binding(self, mech_context, context, attrs): def _notify_l3_agent_new_port(self, context, port):
if not port:
return
# Whenever a DVR serviceable port comes up on a
# node, it has to be communicated to the L3 Plugin
# and agent for creating the respective namespaces.
if (utils.is_dvr_serviced(port['device_owner'])):
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
if (utils.is_extension_supported(
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
l3plugin.dvr_update_router_addvm(context, port)
def _get_host_port_if_changed(self, mech_context, attrs):
binding = mech_context._binding
host = attrs and attrs.get(portbindings.HOST_ID)
if (attributes.is_attr_set(host) and binding.host != host):
return mech_context.current
def _process_port_binding(self, mech_context, attrs):
binding = mech_context._binding binding = mech_context._binding
port = mech_context.current port = mech_context.current
changes = False changes = False
@ -169,15 +189,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
binding.host != host): binding.host != host):
binding.host = host binding.host = host
changes = True changes = True
# Whenever a DVR serviceable port comes up on a
# node, it has to be communicated to the L3 Plugin
# and agent for creating the respective namespaces.
if (utils.is_dvr_serviced(port['device_owner'])):
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
if (utils.is_extension_supported(
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
l3plugin.dvr_update_router_addvm(context, port)
vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE) vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
if (attributes.is_attr_set(vnic_type) and if (attributes.is_attr_set(vnic_type) and
@ -770,7 +781,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
binding = db.add_port_binding(session, result['id']) binding = db.add_port_binding(session, result['id'])
mech_context = driver_context.PortContext(self, context, result, mech_context = driver_context.PortContext(self, context, result,
network, binding) network, binding)
self._process_port_binding(mech_context, context, attrs) new_host_port = self._get_host_port_if_changed(mech_context, attrs)
self._process_port_binding(mech_context, attrs)
result[addr_pair.ADDRESS_PAIRS] = ( result[addr_pair.ADDRESS_PAIRS] = (
self._process_create_allowed_address_pairs( self._process_create_allowed_address_pairs(
@ -780,6 +792,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
dhcp_opts) dhcp_opts)
self.mechanism_manager.create_port_precommit(mech_context) self.mechanism_manager.create_port_precommit(mech_context)
# Notification must be sent after the above transaction is complete
self._notify_l3_agent_new_port(context, new_host_port)
try: try:
self.mechanism_manager.create_port_postcommit(mech_context) self.mechanism_manager.create_port_postcommit(mech_context)
except ml2_exc.MechanismDriverError: except ml2_exc.MechanismDriverError:
@ -834,10 +849,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
mech_context = driver_context.PortContext( mech_context = driver_context.PortContext(
self, context, updated_port, network, binding, self, context, updated_port, network, binding,
original_port=original_port) original_port=original_port)
new_host_port = self._get_host_port_if_changed(mech_context, attrs)
need_port_update_notify |= self._process_port_binding( need_port_update_notify |= self._process_port_binding(
mech_context, context, attrs) mech_context, attrs)
self.mechanism_manager.update_port_precommit(mech_context) self.mechanism_manager.update_port_precommit(mech_context)
# Notification must be sent after the above transaction is complete
self._notify_l3_agent_new_port(context, new_host_port)
# TODO(apech) - handle errors raised by update_port, potentially # TODO(apech) - handle errors raised by update_port, potentially
# by re-calling update_port with the previous attributes. For # by re-calling update_port with the previous attributes. For
# now the error is propogated to the caller, which is expected to # now the error is propogated to the caller, which is expected to

View File

@ -23,6 +23,7 @@ from neutron.common import constants
from neutron.common import exceptions as exc from neutron.common import exceptions as exc
from neutron.common import utils from neutron.common import utils
from neutron import context from neutron import context
from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.extensions import multiprovidernet as mpnet from neutron.extensions import multiprovidernet as mpnet
from neutron.extensions import portbindings from neutron.extensions import portbindings
from neutron.extensions import providernet as pnet from neutron.extensions import providernet as pnet
@ -36,6 +37,7 @@ from neutron.plugins.ml2 import driver_context
from neutron.plugins.ml2.drivers import type_vlan from neutron.plugins.ml2.drivers import type_vlan
from neutron.plugins.ml2 import models from neutron.plugins.ml2 import models
from neutron.plugins.ml2 import plugin as ml2_plugin from neutron.plugins.ml2 import plugin as ml2_plugin
from neutron.tests import base
from neutron.tests.unit import _test_extension_portbindings as test_bindings from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.ml2.drivers import mechanism_logger as mech_logger from neutron.tests.unit.ml2.drivers import mechanism_logger as mech_logger
from neutron.tests.unit.ml2.drivers import mechanism_test as mech_test from neutron.tests.unit.ml2.drivers import mechanism_test as mech_test
@ -940,3 +942,60 @@ class TestFaultyMechansimDriver(Ml2PluginV2FaultyDriverTestCase):
self.assertEqual(new_name, port['port']['name']) self.assertEqual(new_name, port['port']['name'])
self._delete('ports', port['port']['id']) self._delete('ports', port['port']['id'])
class TestMl2PluginCreateUpdatePort(base.BaseTestCase):
def setUp(self):
super(TestMl2PluginCreateUpdatePort, self).setUp()
self.context = mock.MagicMock()
def _ensure_transaction_is_closed(self):
transaction = self.context.session.begin(subtransactions=True)
enter = transaction.__enter__.call_count
exit = transaction.__exit__.call_count
self.assertEqual(enter, exit)
def _create_plugin_for_create_update_port(self, new_host_port):
plugin = ml2_plugin.Ml2Plugin()
plugin.extension_manager = mock.Mock()
plugin.type_manager = mock.Mock()
plugin.mechanism_manager = mock.Mock()
plugin.notifier = mock.Mock()
plugin._get_host_port_if_changed = mock.Mock(
return_value=new_host_port)
plugin._notify_l3_agent_new_port = mock.Mock()
plugin._notify_l3_agent_new_port.side_effect = (
lambda c, p: self._ensure_transaction_is_closed())
return plugin
def test_create_port_rpc_outside_transaction(self):
with contextlib.nested(
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
mock.patch.object(base_plugin.NeutronDbPluginV2, 'create_port'),
) as (init, super_create_port):
init.return_value = None
new_host_port = mock.Mock()
plugin = self._create_plugin_for_create_update_port(new_host_port)
plugin.create_port(self.context, mock.MagicMock())
plugin._notify_l3_agent_new_port.assert_called_once_with(
self.context, new_host_port)
def test_update_port_rpc_outside_transaction(self):
with contextlib.nested(
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'),
) as (init, super_update_port):
init.return_value = None
new_host_port = mock.Mock()
plugin = self._create_plugin_for_create_update_port(new_host_port)
plugin.update_port(self.context, 'fake_id', mock.MagicMock())
plugin._notify_l3_agent_new_port.assert_called_once_with(
self.context, new_host_port)