Move postcommit ops out of transaction for bulk
Currently, the bulk create operations in ML2 are executed in a transaction. This means all precommit and postcommit operations for such operations are in a transaction. Postcommit operations are expected to be executed outside of transactions as they may communicate with a backend and introduce substantial delays. This fix removes the postcommit operations from the transaction for bulk create network/subnet/port operations. Change-Id: I9a9683058088e50d9443040223232bf5e1396ccf Closes-Bug: #1193861
This commit is contained in:
parent
5de1d2ed67
commit
2ee08c3464
|
@ -55,8 +55,6 @@ class BigSwitchMechanismDriver(plugin.NeutronRestProxyV2Base,
|
|||
# register plugin config opts
|
||||
pl_config.register_config()
|
||||
self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size)
|
||||
# backend doesn't support bulk operations yet
|
||||
self.native_bulk_support = False
|
||||
|
||||
# init network ctrl connections
|
||||
self.servers = servermanager.ServerPool()
|
||||
|
|
|
@ -267,13 +267,9 @@ class MechanismManager(stevedore.named.NamedExtensionManager):
|
|||
[driver.name for driver in self.ordered_mech_drivers])
|
||||
|
||||
def initialize(self):
|
||||
# For ML2 to support bulk operations, each driver must support them
|
||||
self.native_bulk_support = True
|
||||
for driver in self.ordered_mech_drivers:
|
||||
LOG.info(_LI("Initializing mechanism driver '%s'"), driver.name)
|
||||
driver.obj.initialize()
|
||||
self.native_bulk_support &= getattr(driver.obj,
|
||||
'native_bulk_support', True)
|
||||
|
||||
def _call_on_drivers(self, method_name, context,
|
||||
continue_on_failure=False):
|
||||
|
|
|
@ -126,8 +126,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.type_manager.initialize()
|
||||
self.extension_manager.initialize()
|
||||
self.mechanism_manager.initialize()
|
||||
# bulk support depends on the underlying drivers
|
||||
self.__native_bulk_support = self.mechanism_manager.native_bulk_support
|
||||
|
||||
self._setup_rpc()
|
||||
|
||||
|
@ -485,10 +483,59 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
segment[api.SEGMENTATION_ID],
|
||||
segment[api.PHYSICAL_NETWORK])
|
||||
|
||||
# TODO(apech): Need to override bulk operations
|
||||
def _delete_objects(self, context, resource, objects):
|
||||
delete_op = getattr(self, 'delete_%s' % resource)
|
||||
for obj in objects:
|
||||
try:
|
||||
delete_op(context, obj['result']['id'])
|
||||
except KeyError:
|
||||
LOG.exception(_LE("Could not find %s to delete."),
|
||||
resource)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Could not delete %(res)s %(id)s."),
|
||||
{'res': resource,
|
||||
'id': obj['result']['id']})
|
||||
|
||||
def create_network(self, context, network):
|
||||
net_data = network['network']
|
||||
def _create_bulk_ml2(self, resource, context, request_items):
|
||||
objects = []
|
||||
collection = "%ss" % resource
|
||||
items = request_items[collection]
|
||||
try:
|
||||
with context.session.begin(subtransactions=True):
|
||||
obj_creator = getattr(self, '_create_%s_db' % resource)
|
||||
for item in items:
|
||||
attrs = item[resource]
|
||||
result, mech_context = obj_creator(context, item)
|
||||
objects.append({'mech_context': mech_context,
|
||||
'result': result,
|
||||
'attributes': attrs})
|
||||
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_LE("An exception occurred while creating "
|
||||
"the %(resource)s:%(item)s"),
|
||||
{'resource': resource, 'item': item})
|
||||
|
||||
try:
|
||||
postcommit_op = getattr(self.mechanism_manager,
|
||||
'create_%s_postcommit' % resource)
|
||||
for obj in objects:
|
||||
postcommit_op(obj['mech_context'])
|
||||
return objects
|
||||
except ml2_exc.MechanismDriverError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
resource_ids = [res['result']['id'] for res in objects]
|
||||
LOG.exception(_LE("mechanism_manager.create_%(res)s"
|
||||
"_postcommit failed for %(res)s: "
|
||||
"'%(failed_id)s'. Deleting "
|
||||
"%(res)ss %(resource_ids)s"),
|
||||
{'res': resource,
|
||||
'failed_id': obj['result']['id'],
|
||||
'resource_ids': ', '.join(resource_ids)})
|
||||
self._delete_objects(context, resource, objects)
|
||||
|
||||
def _create_network_db(self, context, network):
|
||||
net_data = network[attributes.NETWORK]
|
||||
tenant_id = self._get_tenant_id_for_create(context, net_data)
|
||||
session = context.session
|
||||
with session.begin(subtransactions=True):
|
||||
|
@ -504,7 +551,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
mech_context = driver_context.NetworkContext(self, context,
|
||||
result)
|
||||
self.mechanism_manager.create_network_precommit(mech_context)
|
||||
return result, mech_context
|
||||
|
||||
def create_network(self, context, network):
|
||||
result, mech_context = self._create_network_db(context, network)
|
||||
try:
|
||||
self.mechanism_manager.create_network_postcommit(mech_context)
|
||||
except ml2_exc.MechanismDriverError:
|
||||
|
@ -512,8 +562,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
LOG.error(_LE("mechanism_manager.create_network_postcommit "
|
||||
"failed, deleting network '%s'"), result['id'])
|
||||
self.delete_network(context, result['id'])
|
||||
|
||||
return result
|
||||
|
||||
def create_network_bulk(self, context, networks):
|
||||
objects = self._create_bulk_ml2(attributes.NETWORK, context, networks)
|
||||
return [obj['result'] for obj in objects]
|
||||
|
||||
def update_network(self, context, id, network):
|
||||
provider._raise_if_updates_provider_attributes(network['network'])
|
||||
|
||||
|
@ -661,7 +716,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
" failed"))
|
||||
self.notifier.network_delete(context, id)
|
||||
|
||||
def create_subnet(self, context, subnet):
|
||||
def _create_subnet_db(self, context, subnet):
|
||||
session = context.session
|
||||
with session.begin(subtransactions=True):
|
||||
result = super(Ml2Plugin, self).create_subnet(context, subnet)
|
||||
|
@ -670,6 +725,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
mech_context = driver_context.SubnetContext(self, context, result)
|
||||
self.mechanism_manager.create_subnet_precommit(mech_context)
|
||||
|
||||
return result, mech_context
|
||||
|
||||
def create_subnet(self, context, subnet):
|
||||
result, mech_context = self._create_subnet_db(context, subnet)
|
||||
try:
|
||||
self.mechanism_manager.create_subnet_postcommit(mech_context)
|
||||
except ml2_exc.MechanismDriverError:
|
||||
|
@ -679,6 +738,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.delete_subnet(context, result['id'])
|
||||
return result
|
||||
|
||||
def create_subnet_bulk(self, context, subnets):
|
||||
objects = self._create_bulk_ml2(attributes.SUBNET, context, subnets)
|
||||
return [obj['result'] for obj in objects]
|
||||
|
||||
def update_subnet(self, context, id, subnet):
|
||||
session = context.session
|
||||
with session.begin(subtransactions=True):
|
||||
|
@ -780,8 +843,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
# the fact that an error occurred.
|
||||
LOG.error(_LE("mechanism_manager.delete_subnet_postcommit failed"))
|
||||
|
||||
def create_port(self, context, port):
|
||||
attrs = port['port']
|
||||
def _create_port_db(self, context, port):
|
||||
attrs = port[attributes.PORT]
|
||||
attrs['status'] = const.PORT_STATUS_DOWN
|
||||
|
||||
session = context.session
|
||||
|
@ -796,7 +859,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
binding = db.add_port_binding(session, result['id'])
|
||||
mech_context = driver_context.PortContext(self, context, result,
|
||||
network, binding)
|
||||
new_host_port = self._get_host_port_if_changed(mech_context, attrs)
|
||||
|
||||
self._process_port_binding(mech_context, attrs)
|
||||
|
||||
result[addr_pair.ADDRESS_PAIRS] = (
|
||||
|
@ -807,7 +870,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
dhcp_opts)
|
||||
self.mechanism_manager.create_port_precommit(mech_context)
|
||||
|
||||
# Notification must be sent after the above transaction is complete
|
||||
return result, mech_context
|
||||
|
||||
def create_port(self, context, port):
|
||||
attrs = port['port']
|
||||
result, mech_context = self._create_port_db(context, port)
|
||||
new_host_port = self._get_host_port_if_changed(mech_context, attrs)
|
||||
self._notify_l3_agent_new_port(context, new_host_port)
|
||||
|
||||
try:
|
||||
|
@ -831,6 +899,35 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.delete_port(context, result['id'])
|
||||
return bound_context._port
|
||||
|
||||
def create_port_bulk(self, context, ports):
|
||||
objects = self._create_bulk_ml2(attributes.PORT, context, ports)
|
||||
|
||||
for obj in objects:
|
||||
# REVISIT(rkukura): Is there any point in calling this before
|
||||
# a binding has been successfully established?
|
||||
# TODO(banix): Use a single notification for all objects
|
||||
self.notify_security_groups_member_updated(context,
|
||||
obj['result'])
|
||||
|
||||
attrs = obj['attributes']
|
||||
if attrs and attrs.get(portbindings.HOST_ID):
|
||||
new_host_port = self._get_host_port_if_changed(
|
||||
obj['mech_context'], attrs)
|
||||
self._notify_l3_agent_new_port(context, new_host_port)
|
||||
|
||||
try:
|
||||
for obj in objects:
|
||||
obj['bound_context'] = self._bind_port_if_needed(
|
||||
obj['mech_context'])
|
||||
return [obj['bound_context']._port for obj in objects]
|
||||
except ml2_exc.MechanismDriverError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
resource_ids = [res['result']['id'] for res in objects]
|
||||
LOG.error(_LE("_bind_port_if_needed failed. "
|
||||
"Deleting all ports from create bulk '%s'"),
|
||||
resource_ids)
|
||||
self._delete_objects(context, 'port', objects)
|
||||
|
||||
def update_port(self, context, id, port):
|
||||
attrs = port['port']
|
||||
need_port_update_notify = False
|
||||
|
|
|
@ -309,7 +309,7 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
|
|||
plugin_obj = manager.NeutronManager.get_plugin()
|
||||
orig = plugin_obj.create_port
|
||||
with mock.patch.object(plugin_obj,
|
||||
'create_port') as patched_plugin:
|
||||
'_create_port_db') as patched_plugin:
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
return self._fail_second_call(patched_plugin, orig,
|
||||
|
@ -343,7 +343,7 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
|
|||
plugin_obj = manager.NeutronManager.get_plugin()
|
||||
orig = plugin_obj.create_port
|
||||
with mock.patch.object(plugin_obj,
|
||||
'create_port') as patched_plugin:
|
||||
'_create_port_db') as patched_plugin:
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
return self._fail_second_call(patched_plugin, orig,
|
||||
|
@ -718,7 +718,7 @@ class TestCiscoNetworksV2(CiscoML2MechanismTestCase,
|
|||
with mock.patch('__builtin__.hasattr',
|
||||
new=fakehasattr):
|
||||
with mock.patch.object(plugin_obj,
|
||||
'create_network') as patched_plugin:
|
||||
'_create_network_db') as patched_plugin:
|
||||
def side_effect(*args, **kwargs):
|
||||
return self._fail_second_call(patched_plugin, orig,
|
||||
*args, **kwargs)
|
||||
|
@ -737,7 +737,7 @@ class TestCiscoNetworksV2(CiscoML2MechanismTestCase,
|
|||
plugin_obj = manager.NeutronManager.get_plugin()
|
||||
orig = plugin_obj.create_network
|
||||
with mock.patch.object(plugin_obj,
|
||||
'create_network') as patched_plugin:
|
||||
'_create_network_db') as patched_plugin:
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
return self._fail_second_call(patched_plugin, orig,
|
||||
|
@ -769,7 +769,7 @@ class TestCiscoSubnetsV2(CiscoML2MechanismTestCase,
|
|||
plugin_obj = manager.NeutronManager.get_plugin()
|
||||
orig = plugin_obj.create_subnet
|
||||
with mock.patch.object(plugin_obj,
|
||||
'create_subnet') as patched_plugin:
|
||||
'_create_subnet_db') as patched_plugin:
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
self._fail_second_call(patched_plugin, orig,
|
||||
|
@ -792,7 +792,7 @@ class TestCiscoSubnetsV2(CiscoML2MechanismTestCase,
|
|||
plugin_obj = manager.NeutronManager.get_plugin()
|
||||
orig = plugin_obj.create_subnet
|
||||
with mock.patch.object(plugin_obj,
|
||||
'create_subnet') as patched_plugin:
|
||||
'_create_subnet_db') as patched_plugin:
|
||||
def side_effect(*args, **kwargs):
|
||||
return self._fail_second_call(patched_plugin, orig,
|
||||
*args, **kwargs)
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
# Copyright (c) 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron.plugins.ml2 import driver_api as api
|
||||
|
||||
|
||||
class BulklessMechanismDriver(api.MechanismDriver):
|
||||
"""Test mechanism driver for testing bulk emulation."""
|
||||
|
||||
def initialize(self):
|
||||
self.native_bulk_support = False
|
|
@ -90,14 +90,6 @@ class Ml2PluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
|
|||
self.context = context.get_admin_context()
|
||||
|
||||
|
||||
class TestMl2BulkToggleWithBulkless(Ml2PluginV2TestCase):
|
||||
|
||||
_mechanism_drivers = ['logger', 'test', 'bulkless']
|
||||
|
||||
def test_bulk_disable_with_bulkless_driver(self):
|
||||
self.assertTrue(self._skip_native_bulk)
|
||||
|
||||
|
||||
class TestMl2BulkToggleWithoutBulkless(Ml2PluginV2TestCase):
|
||||
|
||||
_mechanism_drivers = ['logger', 'test']
|
||||
|
@ -166,6 +158,23 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
|||
flips = l3plugin.get_floatingips(context.get_admin_context())
|
||||
self.assertFalse(flips)
|
||||
|
||||
def test_create_ports_bulk_port_binding_failure(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self.network() as net:
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
|
||||
with mock.patch.object(plugin, '_bind_port_if_needed',
|
||||
side_effect=ml2_exc.MechanismDriverError(
|
||||
method='create_port_bulk')) as _bind_port_if_needed:
|
||||
|
||||
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
|
||||
'test', True, context=ctx)
|
||||
|
||||
self.assertTrue(_bind_port_if_needed.called)
|
||||
# We expect a 500 as we injected a fault in the plugin
|
||||
self._validate_behavior_on_bulk_failure(
|
||||
res, 'ports', webob.exc.HTTPServerError.code)
|
||||
|
||||
def test_delete_port_no_notify_in_disassociate_floatingips(self):
|
||||
ctx = context.get_admin_context()
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
|
|
|
@ -63,6 +63,17 @@ def _fake_get_sorting_helper(self, request):
|
|||
return api_common.SortingEmulatedHelper(request, self._attr_info)
|
||||
|
||||
|
||||
# TODO(banix): Move the following method to ML2 db test module when ML2
|
||||
# mechanism driver unit tests are corrected to use Ml2PluginV2TestCase
|
||||
# instead of directly using NeutronDbPluginV2TestCase
|
||||
def _get_create_db_method(resource):
|
||||
ml2_method = '_create_%s_db' % resource
|
||||
if hasattr(manager.NeutronManager.get_plugin(), ml2_method):
|
||||
return ml2_method
|
||||
else:
|
||||
return 'create_%s' % resource
|
||||
|
||||
|
||||
class NeutronDbPluginV2TestCase(testlib_api.WebTestCase,
|
||||
testlib_plugin.PluginSetupHelper):
|
||||
fmt = 'json'
|
||||
|
@ -883,8 +894,9 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
|
|||
with mock.patch('__builtin__.hasattr',
|
||||
new=fakehasattr):
|
||||
orig = manager.NeutronManager.get_plugin().create_port
|
||||
method_to_patch = _get_create_db_method('port')
|
||||
with mock.patch.object(manager.NeutronManager.get_plugin(),
|
||||
'create_port') as patched_plugin:
|
||||
method_to_patch) as patched_plugin:
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
return self._fail_second_call(patched_plugin, orig,
|
||||
|
@ -908,7 +920,8 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
|
|||
with self.network() as net:
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
orig = plugin.create_port
|
||||
with mock.patch.object(plugin, 'create_port') as patched_plugin:
|
||||
method_to_patch = _get_create_db_method('port')
|
||||
with mock.patch.object(plugin, method_to_patch) as patched_plugin:
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
return self._fail_second_call(patched_plugin, orig,
|
||||
|
@ -2073,8 +2086,9 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
|
|||
#ensures the API choose the emulation code path
|
||||
with mock.patch('__builtin__.hasattr',
|
||||
new=fakehasattr):
|
||||
method_to_patch = _get_create_db_method('network')
|
||||
with mock.patch.object(manager.NeutronManager.get_plugin(),
|
||||
'create_network') as patched_plugin:
|
||||
method_to_patch) as patched_plugin:
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
return self._fail_second_call(patched_plugin, orig,
|
||||
|
@ -2091,8 +2105,9 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
|
|||
if self._skip_native_bulk:
|
||||
self.skipTest("Plugin does not support native bulk network create")
|
||||
orig = manager.NeutronManager.get_plugin().create_network
|
||||
method_to_patch = _get_create_db_method('network')
|
||||
with mock.patch.object(manager.NeutronManager.get_plugin(),
|
||||
'create_network') as patched_plugin:
|
||||
method_to_patch) as patched_plugin:
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
return self._fail_second_call(patched_plugin, orig,
|
||||
|
@ -2513,8 +2528,9 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
|
|||
with mock.patch('__builtin__.hasattr',
|
||||
new=fakehasattr):
|
||||
orig = manager.NeutronManager.get_plugin().create_subnet
|
||||
method_to_patch = _get_create_db_method('subnet')
|
||||
with mock.patch.object(manager.NeutronManager.get_plugin(),
|
||||
'create_subnet') as patched_plugin:
|
||||
method_to_patch) as patched_plugin:
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
self._fail_second_call(patched_plugin, orig,
|
||||
|
@ -2536,7 +2552,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
|
|||
self.skipTest("Plugin does not support native bulk subnet create")
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
orig = plugin.create_subnet
|
||||
with mock.patch.object(plugin, 'create_subnet') as patched_plugin:
|
||||
method_to_patch = _get_create_db_method('subnet')
|
||||
with mock.patch.object(plugin, method_to_patch) as patched_plugin:
|
||||
def side_effect(*args, **kwargs):
|
||||
return self._fail_second_call(patched_plugin, orig,
|
||||
*args, **kwargs)
|
||||
|
|
|
@ -173,7 +173,6 @@ neutron.ml2.mechanism_drivers =
|
|||
opendaylight = neutron.plugins.ml2.drivers.mechanism_odl:OpenDaylightMechanismDriver
|
||||
logger = neutron.tests.unit.ml2.drivers.mechanism_logger:LoggerMechanismDriver
|
||||
test = neutron.tests.unit.ml2.drivers.mechanism_test:TestMechanismDriver
|
||||
bulkless = neutron.tests.unit.ml2.drivers.mechanism_bulkless:BulklessMechanismDriver
|
||||
linuxbridge = neutron.plugins.ml2.drivers.mech_linuxbridge:LinuxbridgeMechanismDriver
|
||||
openvswitch = neutron.plugins.ml2.drivers.mech_openvswitch:OpenvswitchMechanismDriver
|
||||
hyperv = neutron.plugins.ml2.drivers.mech_hyperv:HypervMechanismDriver
|
||||
|
|
Loading…
Reference in New Issue