Avoid long transaction in plugin.delete_ports()

db_plugin.delete_ports() called plugin.delete_port() under
a transaction. It leads to long transaction if plugin.delete_port
talks with external systems. This commit changes each delete_port
outside of a transaction to avoid longer transaction.

plugin.delete_ports is now called by release_dhcp_ports and
dhcp-agent ports can be deleted separately, so this changes
does not break the existing behavior.

delete_ports is renamed to delete_ports_by_device_id
to clarify the usage of this method.

NEC plugin already has this change and it is no longer needed.

_do_side_effect helper method in test_db_plugin is renamed
to more self-descriptive name.

Change-Id: Ied5883a57c7774c3b0778453d84c717b337f88c0
Closes-Bug: #1282925
Related-Bug: #1283522
This commit is contained in:
Akihiro Motoki 2014-03-07 15:58:46 +09:00
parent 1e81fdb1ea
commit e9e4683024
7 changed files with 78 additions and 73 deletions

View File

@ -1349,18 +1349,15 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
self._delete_port(context, id) self._delete_port(context, id)
def delete_ports(self, context, filters): def delete_ports_by_device_id(self, context, device_id, network_id=None):
with context.session.begin(subtransactions=True): query = (context.session.query(models_v2.Port.id)
# Disable eagerloads to avoid postgresql issues with outer joins .enable_eagerloads(False)
# and SELECT FOR UPDATE. This means that only filters for columns .filter(models_v2.Port.device_id == device_id))
# on the Port model will be effective, which is fine in nearly all if network_id:
# the cases where filters are used query = query.filter(models_v2.Port.network_id == network_id)
query = context.session.query( port_ids = [p[0] for p in query]
models_v2.Port).enable_eagerloads(False) for port_id in port_ids:
ports = self._apply_filters_to_query( self.delete_port(context, port_id)
query, models_v2.Port, filters).with_lockmode('update').all()
for port in ports:
self.delete_port(context, port['id'])
def _delete_port(self, context, id): def _delete_port(self, context, id):
query = (context.session.query(models_v2.Port). query = (context.session.query(models_v2.Port).

View File

@ -216,8 +216,7 @@ class DhcpRpcCallbackMixin(object):
'%(host)s'), '%(host)s'),
{'network_id': network_id, 'host': host}) {'network_id': network_id, 'host': host})
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
filters = dict(network_id=[network_id], device_id=[device_id]) plugin.delete_ports_by_device_id(context, device_id, network_id)
plugin.delete_ports(context, filters=filters)
def release_port_fixed_ip(self, context, **kwargs): def release_port_fixed_ip(self, context, **kwargs):
"""Release the fixed_ip associated the subnet on a port.""" """Release the fixed_ip associated the subnet on a port."""

View File

@ -31,7 +31,6 @@ from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base from neutron.db import dhcp_rpc_base
from neutron.db import external_net_db from neutron.db import external_net_db
from neutron.db import l3_rpc_base from neutron.db import l3_rpc_base
from neutron.db import models_v2
from neutron.db import portbindings_base from neutron.db import portbindings_base
from neutron.db import portbindings_db from neutron.db import portbindings_db
from neutron.db import quota_db # noqa from neutron.db import quota_db # noqa
@ -646,18 +645,6 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
super(NECPluginV2, self).delete_port(context, id) super(NECPluginV2, self).delete_port(context, id)
self.notify_security_groups_member_updated(context, port) self.notify_security_groups_member_updated(context, port)
def delete_ports(self, context, filters):
# Note(amotoki): Override the superclass method to avoid
# a long transaction over external API calls.
# TODO(amotoki): Need to revisit after bug 1282925 is addressed.
query = context.session.query(
models_v2.Port).enable_eagerloads(False)
query = self._apply_filters_to_query(
query, models_v2.Port, filters)
port_ids = [p['id'] for p in query]
for port_id in port_ids:
self.delete_port(context, port_id)
class NECPluginV2AgentNotifierApi(proxy.RpcProxy, class NECPluginV2AgentNotifierApi(proxy.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin): sg_rpc.SecurityGroupAgentRpcApiMixin):

View File

@ -859,6 +859,10 @@ class TestCiscoPortsV2(CiscoNetworkPluginV2TestCase,
data['device_id'], data['device_id'],
NEXUS_PORT_2) NEXUS_PORT_2)
def test_delete_ports_by_device_id_second_call_failure(self):
plugin_ref = self._get_plugin_ref()
self._test_delete_ports_by_device_id_second_call_failure(plugin_ref)
class TestCiscoNetworksV2(CiscoNetworkPluginV2TestCase, class TestCiscoNetworksV2(CiscoNetworkPluginV2TestCase,
test_db_plugin.TestNetworksV2): test_db_plugin.TestNetworksV2):

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import contextlib
import os import os
import fixtures import fixtures
@ -116,30 +115,6 @@ class TestNecV2HTTPResponse(test_plugin.TestV2HTTPResponse,
pass pass
class TestNecPortsV2(test_plugin.TestPortsV2, NecPluginV2TestCase):
def test_delete_ports(self):
with self.subnet() as subnet:
with contextlib.nested(
self.port(subnet=subnet, device_owner='test-owner',
no_delete=True),
self.port(subnet=subnet, device_owner='test-owner',
no_delete=True),
self.port(subnet=subnet, device_owner='other-owner'),
) as (p1, p2, p3):
network_id = subnet['subnet']['network_id']
filters = {'network_id': [network_id],
'device_owner': ['test-owner']}
self.plugin.delete_ports(self.context, filters)
self._show('ports', p1['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
self._show('ports', p2['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
self._show('ports', p3['port']['id'],
expected_code=webob.exc.HTTPOk.code)
class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase): class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase):
pass pass

View File

@ -479,7 +479,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
self.assertEqual(res.status_int, webob.exc.HTTPOk.code) self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
return self.deserialize(fmt, res) return self.deserialize(fmt, res)
def _do_side_effect(self, patched_plugin, orig, *args, **kwargs): def _fail_second_call(self, patched_plugin, orig, *args, **kwargs):
"""Invoked by test cases for injecting failures in plugin.""" """Invoked by test cases for injecting failures in plugin."""
def second_call(*args, **kwargs): def second_call(*args, **kwargs):
raise n_exc.NeutronException() raise n_exc.NeutronException()
@ -901,7 +901,7 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
'create_port') as patched_plugin: 'create_port') as patched_plugin:
def side_effect(*args, **kwargs): def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig, return self._fail_second_call(patched_plugin, orig,
*args, **kwargs) *args, **kwargs)
patched_plugin.side_effect = side_effect patched_plugin.side_effect = side_effect
@ -925,7 +925,7 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
'create_port') as patched_plugin: 'create_port') as patched_plugin:
def side_effect(*args, **kwargs): def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig, return self._fail_second_call(patched_plugin, orig,
*args, **kwargs) *args, **kwargs)
patched_plugin.side_effect = side_effect patched_plugin.side_effect = side_effect
@ -1655,6 +1655,56 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
self.assertEqual(res.status_int, self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code) webob.exc.HTTPClientError.code)
def test_delete_ports_by_device_id(self):
plugin = NeutronManager.get_plugin()
ctx = context.get_admin_context()
with self.subnet() as subnet:
with contextlib.nested(
self.port(subnet=subnet, device_id='owner1', no_delete=True),
self.port(subnet=subnet, device_id='owner1', no_delete=True),
self.port(subnet=subnet, device_id='owner2'),
) as (p1, p2, p3):
network_id = subnet['subnet']['network_id']
plugin.delete_ports_by_device_id(ctx, 'owner1',
network_id)
self._show('ports', p1['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
self._show('ports', p2['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
self._show('ports', p3['port']['id'],
expected_code=webob.exc.HTTPOk.code)
def _test_delete_ports_by_device_id_second_call_failure(self, plugin):
ctx = context.get_admin_context()
with self.subnet() as subnet:
with contextlib.nested(
self.port(subnet=subnet, device_id='owner1', no_delete=True),
self.port(subnet=subnet, device_id='owner1'),
self.port(subnet=subnet, device_id='owner2'),
) as (p1, p2, p3):
orig = plugin.delete_port
with mock.patch.object(plugin, 'delete_port') as del_port:
def side_effect(*args, **kwargs):
return self._fail_second_call(del_port, orig,
*args, **kwargs)
del_port.side_effect = side_effect
network_id = subnet['subnet']['network_id']
self.assertRaises(n_exc.NeutronException,
plugin.delete_ports_by_device_id,
ctx, 'owner1', network_id)
self._show('ports', p1['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
self._show('ports', p2['port']['id'],
expected_code=webob.exc.HTTPOk.code)
self._show('ports', p3['port']['id'],
expected_code=webob.exc.HTTPOk.code)
def test_delete_ports_by_device_id_second_call_failure(self):
plugin = NeutronManager.get_plugin()
self._test_delete_ports_by_device_id_second_call_failure(plugin)
class TestNetworksV2(NeutronDbPluginV2TestCase): class TestNetworksV2(NeutronDbPluginV2TestCase):
# NOTE(cerberus): successful network update and delete are # NOTE(cerberus): successful network update and delete are
@ -1915,7 +1965,7 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
'create_network') as patched_plugin: 'create_network') as patched_plugin:
def side_effect(*args, **kwargs): def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig, return self._fail_second_call(patched_plugin, orig,
*args, **kwargs) *args, **kwargs)
patched_plugin.side_effect = side_effect patched_plugin.side_effect = side_effect
@ -1933,7 +1983,7 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
'create_network') as patched_plugin: 'create_network') as patched_plugin:
def side_effect(*args, **kwargs): def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig, return self._fail_second_call(patched_plugin, orig,
*args, **kwargs) *args, **kwargs)
patched_plugin.side_effect = side_effect patched_plugin.side_effect = side_effect
@ -2343,7 +2393,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
'create_subnet') as patched_plugin: 'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs): def side_effect(*args, **kwargs):
self._do_side_effect(patched_plugin, orig, self._fail_second_call(patched_plugin, orig,
*args, **kwargs) *args, **kwargs)
patched_plugin.side_effect = side_effect patched_plugin.side_effect = side_effect
@ -2363,7 +2413,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
with mock.patch.object(NeutronManager._instance.plugin, with mock.patch.object(NeutronManager._instance.plugin,
'create_subnet') as patched_plugin: 'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs): def side_effect(*args, **kwargs):
return self._do_side_effect(patched_plugin, orig, return self._fail_second_call(patched_plugin, orig,
*args, **kwargs) *args, **kwargs)
patched_plugin.side_effect = side_effect patched_plugin.side_effect = side_effect

View File

@ -34,11 +34,6 @@ class TestDhcpRpcCallbackMixin(base.BaseTestCase):
self.log_p = mock.patch('neutron.db.dhcp_rpc_base.LOG') self.log_p = mock.patch('neutron.db.dhcp_rpc_base.LOG')
self.log = self.log_p.start() self.log = self.log_p.start()
def tearDown(self):
self.log_p.stop()
self.plugin_p.stop()
super(TestDhcpRpcCallbackMixin, self).tearDown()
def test_get_active_networks(self): def test_get_active_networks(self):
plugin_retval = [dict(id='a'), dict(id='b')] plugin_retval = [dict(id='a'), dict(id='b')]
self.plugin.get_networks.return_value = plugin_retval self.plugin.get_networks.return_value = plugin_retval
@ -221,9 +216,7 @@ class TestDhcpRpcCallbackMixin(base.BaseTestCase):
device_id='devid') device_id='devid')
self.plugin.assert_has_calls([ self.plugin.assert_has_calls([
mock.call.delete_ports(mock.ANY, mock.call.delete_ports_by_device_id(mock.ANY, 'devid', 'netid')])
filters=dict(network_id=['netid'],
device_id=['devid']))])
def test_release_port_fixed_ip(self): def test_release_port_fixed_ip(self):
port_retval = dict(id='port_id', fixed_ips=[dict(subnet_id='a')]) port_retval = dict(id='port_id', fixed_ips=[dict(subnet_id='a')])