Utilize bulk port creation ops in ml2 plugin

Rather than iterating through all ports and leveraging the heavyweight
ml2 method, use a set of optimized functions that attempts to make bulk
port operations as speedy as possible.

The test test_bulk_ports_before_and_after_events_outside_of_txn is
deleted because with the change to using port OVO, the session will
not be closed, and it is not correct to check for it.

Change-Id: Ieea0e6074cd31a2d09ae92f1f3c8d375c6d8ecc2
Implements: blueprint speed-up-neutron-bulk-creation
This commit is contained in:
Nate Johnston 2018-12-12 16:33:50 -05:00
parent b6550e568d
commit 2dc61dfbcc
9 changed files with 231 additions and 176 deletions

View File

@ -207,7 +207,7 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
'name': port['name'], 'name': port['name'],
"network_id": port["network_id"], "network_id": port["network_id"],
'tenant_id': port['tenant_id'], 'tenant_id': port['tenant_id'],
"mac_address": port["mac_address"], "mac_address": str(port["mac_address"]),
"admin_state_up": port["admin_state_up"], "admin_state_up": port["admin_state_up"],
"status": port["status"], "status": port["status"],
"fixed_ips": [{'subnet_id': ip["subnet_id"], "fixed_ips": [{'subnet_id': ip["subnet_id"],
@ -217,7 +217,11 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
"device_owner": port["device_owner"]} "device_owner": port["device_owner"]}
# Call auxiliary extend functions, if any # Call auxiliary extend functions, if any
if process_extensions: if process_extensions:
resource_extend.apply_funcs(port_def.COLLECTION_NAME, res, port) port_data = port
if isinstance(port, port_obj.Port):
port_data = port.db_obj
resource_extend.apply_funcs(
port_def.COLLECTION_NAME, res, port_data)
return db_utils.resource_fields(res, fields) return db_utils.resource_fields(res, fields)
def _get_network(self, context, id): def _get_network(self, context, id):

View File

@ -60,7 +60,6 @@ from neutron.objects import network as network_obj
from neutron.objects import ports as port_obj from neutron.objects import ports as port_obj
from neutron.objects import subnet as subnet_obj from neutron.objects import subnet as subnet_obj
from neutron.objects import subnetpool as subnetpool_obj from neutron.objects import subnetpool as subnetpool_obj
from neutron.plugins.ml2.common import exceptions as ml2_exceptions
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -1325,28 +1324,6 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
op=_("mac address update"), port_id=id, op=_("mac address update"), port_id=id,
device_owner=device_owner) device_owner=device_owner)
@db_api.retry_if_session_inactive()
def create_port_bulk(self, context, ports):
return self._create_bulk('port', context, ports)
def _create_db_port_obj_bulk(self, context, port_data):
db_ports = []
macs = self._generate_macs(len(port_data))
with db_api.CONTEXT_WRITER.using(context):
for port in port_data:
raw_mac_address = port.pop('mac_address',
constants.ATTR_NOT_SPECIFIED)
if raw_mac_address is constants.ATTR_NOT_SPECIFIED:
raw_mac_address = macs.pop()
eui_mac_address = netaddr.EUI(raw_mac_address, 48)
db_port_obj = port_obj.Port(context,
mac_address=eui_mac_address,
id=uuidutils.generate_uuid(),
**port)
db_port_obj.create()
db_ports.append(db_port_obj)
return db_ports
def _create_db_port_obj(self, context, port_data): def _create_db_port_obj(self, context, port_data):
mac_address = port_data.pop('mac_address', None) mac_address = port_data.pop('mac_address', None)
if mac_address: if mac_address:
@ -1365,49 +1342,9 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
db_port = self.create_port_db(context, port) db_port = self.create_port_db(context, port)
return self._make_port_dict(db_port, process_extensions=False) return self._make_port_dict(db_port, process_extensions=False)
def create_port_obj_bulk(self, context, ports): @db_api.retry_if_session_inactive()
bulk_port_data = [] def create_port_bulk(self, context, ports):
network_ids = set() return self._create_bulk('port', context, ports)
with db_api.CONTEXT_WRITER.using(context):
for port in ports:
fixed_ips = port['port'].get('fixed_ips')
if fixed_ips is not constants.ATTR_NOT_SPECIFIED:
raise ml2_exceptions.BulkPortCannotHaveFixedIpError()
pdata = port['port']
if pdata.get('device_owner'):
self._enforce_device_owner_not_router_intf_or_device_id(
context, pdata.get('device_owner'),
pdata.get('device_id'), pdata.get('tenant_id'))
bulk_port_data.append(dict(
project_id=pdata.get('project_id'),
name=pdata.get('name'),
network_id=pdata.get('network_id'),
admin_state_up=pdata.get('admin_state_up'),
status=pdata.get('status', constants.PORT_STATUS_ACTIVE),
mac_address=pdata.get('mac_address'),
device_id=pdata.get('device_id'),
device_owner=pdata.get('device_owner'),
description=pdata.get('description')))
# Ensure that the networks exist.
network_id = pdata.get('network_id')
if network_id not in network_ids:
self._get_network(context, network_id)
network_ids.add(network_id)
db_ports = self._create_db_port_obj_bulk(context, bulk_port_data)
for db_port in db_ports:
try:
self.ipam.allocate_ips_for_port_and_store(
context, db_port, db_port['id'])
db_port['ip_allocation'] = (ipalloc_apidef.
IP_ALLOCATION_IMMEDIATE)
except ipam_exc.DeferIpam:
db_port['ip_allocation'] = (ipalloc_apidef.
IP_ALLOCATION_DEFERRED)
return db_ports
def create_port_db(self, context, port): def create_port_db(self, context, port):
p = port['port'] p = port['port']

View File

@ -19,6 +19,7 @@ from neutron_lib.db import api as db_api
from neutron_lib.db import resource_extend from neutron_lib.db import resource_extend
from neutron.objects.port.extensions import extra_dhcp_opt as obj_extra_dhcp from neutron.objects.port.extensions import extra_dhcp_opt as obj_extra_dhcp
from neutron.objects import ports as port_obj
@resource_extend.has_resource_extenders @resource_extend.has_resource_extenders
@ -119,8 +120,12 @@ class ExtraDhcpOptMixin(object):
@staticmethod @staticmethod
@resource_extend.extends([port_def.COLLECTION_NAME]) @resource_extend.extends([port_def.COLLECTION_NAME])
def _extend_port_dict_extra_dhcp_opt(res, port): def _extend_port_dict_extra_dhcp_opt(res, port):
if isinstance(port, port_obj.Port):
port_dhcp_options = port.get('dhcp_options')
else:
port_dhcp_options = port.dhcp_opts
res[edo_ext.EXTRADHCPOPTS] = [{'opt_name': dho.opt_name, res[edo_ext.EXTRADHCPOPTS] = [{'opt_name': dho.opt_name,
'opt_value': dho.opt_value, 'opt_value': dho.opt_value,
'ip_version': dho.ip_version} 'ip_version': dho.ip_version}
for dho in port.dhcp_opts] for dho in port_dhcp_options]
return res return res

View File

@ -183,11 +183,14 @@ class IpamPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
# match original object, so 'is' check fails # match original object, so 'is' check fails
# TODO(njohnston): Different behavior is required depending on whether # TODO(njohnston): Different behavior is required depending on whether
# a Port object is used or not; once conversion to OVO is complete only # a Port object is used or not; once conversion to OVO is complete only
# the 'else' will be needed. # the first 'if' will be needed
if isinstance(port, dict): if isinstance(port, port_obj.Port):
port_copy = {"port": self._make_port_dict(
port, process_extensions=False)}
elif 'port' in port:
port_copy = {'port': port['port'].copy()} port_copy = {'port': port['port'].copy()}
else: else:
port_copy = {'port': port.to_dict()} port_copy = {'port': port.copy()}
port_copy['port']['id'] = port_id port_copy['port']['id'] = port_id
network_id = port_copy['port']['network_id'] network_id = port_copy['port']['network_id']

View File

@ -38,6 +38,7 @@ from neutron.db.models import securitygroup as sg_models
from neutron.db import rbac_db_mixin as rbac_mixin from neutron.db import rbac_db_mixin as rbac_mixin
from neutron.extensions import securitygroup as ext_sg from neutron.extensions import securitygroup as ext_sg
from neutron.objects import base as base_obj from neutron.objects import base as base_obj
from neutron.objects import ports as port_obj
from neutron.objects import securitygroup as sg_obj from neutron.objects import securitygroup as sg_obj
@ -732,9 +733,12 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
# Security group bindings will be retrieved from the SQLAlchemy # Security group bindings will be retrieved from the SQLAlchemy
# model. As they're loaded eagerly with ports because of the # model. As they're loaded eagerly with ports because of the
# joined load they will not cause an extra query. # joined load they will not cause an extra query.
security_group_ids = [sec_group_mapping['security_group_id'] for if isinstance(port_db, port_obj.Port):
sec_group_mapping in port_db.security_groups] port_res[ext_sg.SECURITYGROUPS] = port_db.security_group_ids
port_res[ext_sg.SECURITYGROUPS] = security_group_ids else:
security_group_ids = [sec_group_mapping['security_group_id'] for
sec_group_mapping in port_db.security_groups]
port_res[ext_sg.SECURITYGROUPS] = security_group_ids
return port_res return port_res
def _process_port_create_security_group(self, context, port, def _process_port_create_security_group(self, context, port,
@ -820,9 +824,11 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
port = port['port'] port = port['port']
if port.get('device_owner') and net.is_port_trusted(port): if port.get('device_owner') and net.is_port_trusted(port):
return return
if not validators.is_attr_set(port.get(ext_sg.SECURITYGROUPS)): port_sg = port.get(ext_sg.SECURITYGROUPS)
if port_sg is None or not validators.is_attr_set(port_sg):
port_project = port.get('tenant_id')
default_sg = self._ensure_default_security_group(context, default_sg = self._ensure_default_security_group(context,
port['tenant_id']) port_project)
port[ext_sg.SECURITYGROUPS] = [default_sg] port[ext_sg.SECURITYGROUPS] = [default_sg]
def _check_update_deletes_security_groups(self, port): def _check_update_deletes_security_groups(self, port):

View File

@ -42,12 +42,6 @@ class ExtensionDriverNotFound(exceptions.InvalidConfigurationOption):
"service plugin %(service_plugin)s not found.") "service plugin %(service_plugin)s not found.")
class BulkPortCannotHaveFixedIpError(exceptions.InvalidInput):
"""You cannot request fixed IP addresses in a bulk port request."""
message = _("Fixed IP addresses cannot be requested from a bulk port "
"allocation.")
class UnknownNetworkType(exceptions.NeutronException): class UnknownNetworkType(exceptions.NeutronException):
"""Network with unknown type.""" """Network with unknown type."""
message = _("Unknown network type %(network_type)s.") message = _("Unknown network type %(network_type)s.")

View File

@ -14,6 +14,7 @@
# under the License. # under the License.
from eventlet import greenthread from eventlet import greenthread
import netaddr
from neutron_lib.agent import constants as agent_consts from neutron_lib.agent import constants as agent_consts
from neutron_lib.agent import topics from neutron_lib.agent import topics
from neutron_lib.api.definitions import address_scope from neutron_lib.api.definitions import address_scope
@ -28,6 +29,7 @@ from neutron_lib.api.definitions import empty_string_filtering
from neutron_lib.api.definitions import external_net from neutron_lib.api.definitions import external_net
from neutron_lib.api.definitions import extra_dhcp_opt as edo_ext from neutron_lib.api.definitions import extra_dhcp_opt as edo_ext
from neutron_lib.api.definitions import filter_validation as filter_apidef from neutron_lib.api.definitions import filter_validation as filter_apidef
from neutron_lib.api.definitions import ip_allocation as ipalloc_apidef
from neutron_lib.api.definitions import ip_substring_port_filtering from neutron_lib.api.definitions import ip_substring_port_filtering
from neutron_lib.api.definitions import multiprovidernet from neutron_lib.api.definitions import multiprovidernet
from neutron_lib.api.definitions import network as net_def from neutron_lib.api.definitions import network as net_def
@ -107,6 +109,7 @@ from neutron.db import vlantransparent_db
from neutron.extensions import filter_validation from neutron.extensions import filter_validation
from neutron.extensions import providernet as provider from neutron.extensions import providernet as provider
from neutron.extensions import vlantransparent from neutron.extensions import vlantransparent
from neutron.ipam import exceptions as ipam_exc
from neutron.objects import base as base_obj from neutron.objects import base as base_obj
from neutron.objects import ports as ports_obj from neutron.objects import ports as ports_obj
from neutron.plugins.ml2.common import exceptions as ml2_exc from neutron.plugins.ml2.common import exceptions as ml2_exc
@ -738,8 +741,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@resource_extend.extends([port_def.COLLECTION_NAME]) @resource_extend.extends([port_def.COLLECTION_NAME])
def _ml2_extend_port_dict_binding(port_res, port_db): def _ml2_extend_port_dict_binding(port_res, port_db):
plugin = directory.get_plugin() plugin = directory.get_plugin()
if isinstance(port_db, ports_obj.Port):
bindings = port_db.bindings
else:
bindings = port_db.port_bindings
port_binding = p_utils.get_port_binding_by_status_and_host( port_binding = p_utils.get_port_binding_by_status_and_host(
port_db.port_bindings, const.ACTIVE) bindings, const.ACTIVE)
# None when called during unit tests for other plugins. # None when called during unit tests for other plugins.
if port_binding: if port_binding:
plugin._update_port_dict_binding(port_res, port_binding) plugin._update_port_dict_binding(port_res, port_binding)
@ -1354,8 +1361,135 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@utils.transaction_guard @utils.transaction_guard
@db_api.retry_if_session_inactive() @db_api.retry_if_session_inactive()
def create_port_bulk(self, context, ports): def create_port_bulk(self, context, ports):
objects = self._create_bulk_ml2(port_def.RESOURCE_NAME, context, ports) # TODO(njohnston): Break this up into smaller functions.
return [obj['result'] for obj in objects] port_list = ports.get('ports')
for port in port_list:
self._before_create_port(context, port)
port_data = []
network_cache = dict()
macs = self._generate_macs(len(port_list))
with db_api.CONTEXT_WRITER.using(context):
for port in port_list:
# Set up the port request dict
pdata = port.get('port')
if pdata.get('device_owner'):
self._enforce_device_owner_not_router_intf_or_device_id(
context, pdata.get('device_owner'),
pdata.get('device_id'), pdata.get('tenant_id'))
bulk_port_data = dict(project_id=pdata.get('project_id'),
name=pdata.get('name'),
network_id=pdata.get('network_id'),
admin_state_up=pdata.get('admin_state_up'),
status=pdata.get('status',
const.PORT_STATUS_ACTIVE),
device_id=pdata.get('device_id'),
device_owner=pdata.get('device_owner'),
security_groups=pdata.get('security_groups'),
description=pdata.get('description'))
# Ensure that the networks exist.
network_id = pdata.get('network_id')
if network_id not in network_cache:
network = self.get_network(context, network_id)
network_cache[network_id] = network
else:
network = network_cache[network_id]
# Determine the MAC address
raw_mac_address = pdata.get('mac_address',
const.ATTR_NOT_SPECIFIED)
if raw_mac_address is const.ATTR_NOT_SPECIFIED:
raw_mac_address = macs.pop()
elif self._is_mac_in_use(context, network_id, raw_mac_address):
raise exc.MacAddressInUse(net_id=network_id,
mac=raw_mac_address)
eui_mac_address = netaddr.EUI(raw_mac_address, 48)
# Create the Port object
db_port_obj = ports_obj.Port(context,
mac_address=eui_mac_address,
id=uuidutils.generate_uuid(),
**bulk_port_data)
db_port_obj.create()
port_dict = self._make_port_dict(db_port_obj,
process_extensions=False)
port_compat = {'port': port_dict}
# Call IPAM to allocate IP addresses
try:
# TODO(njohnston): IPAM allocation needs to be revamped to
# be bulk-friendly.
self.ipam.allocate_ips_for_port_and_store(
context, db_port_obj, db_port_obj['id'])
db_port_obj['ip_allocation'] = (ipalloc_apidef.
IP_ALLOCATION_IMMEDIATE)
except ipam_exc.DeferIpam:
db_port_obj['ip_allocation'] = (ipalloc_apidef.
IP_ALLOCATION_DEFERRED)
fixed_ips = pdata.get('fixed_ips')
if validators.is_attr_set(fixed_ips) and not fixed_ips:
# [] was passed explicitly as fixed_ips: unaddressed port.
db_port_obj['ip_allocation'] = (ipalloc_apidef.
IP_ALLOCATION_NONE)
# Activities immediately post-port-creation
self.extension_manager.process_create_port(context, port_dict,
db_port_obj)
self._portsec_ext_port_create_processing(context, port_dict,
port_compat)
# sgids must be got after portsec checked with security group
sgids = self._get_security_groups_on_port(context, port_compat)
self._process_port_create_security_group(context, port_dict,
sgids)
# process port binding
binding = db.add_port_binding(context, port_dict['id'])
mech_context = driver_context.PortContext(self, context,
port_dict, network,
binding, None)
self._process_port_binding(mech_context, port_dict)
# process allowed address pairs
db_port_obj[addr_apidef.ADDRESS_PAIRS] = (
self._process_create_allowed_address_pairs(
context, port_compat,
port_dict.get(addr_apidef.ADDRESS_PAIRS)))
# handle DHCP setup
dhcp_opts = port_dict.get(edo_ext.EXTRADHCPOPTS, [])
self._process_port_create_extra_dhcp_opts(context, port_dict,
dhcp_opts)
# send PRECOMMIT_CREATE notification
kwargs = {'context': context, 'port': db_port_obj}
registry.notify(
resources.PORT, events.PRECOMMIT_CREATE, self, **kwargs)
self.mechanism_manager.create_port_precommit(mech_context)
# handle DHCP agent provisioning
self._setup_dhcp_agent_provisioning_component(context,
port_dict)
port_data.append(
{
'id': db_port_obj['id'],
'port_obj': db_port_obj,
'mech_context': mech_context,
'port_dict': port_dict
})
# Perform actions after the transaction is committed
completed_ports = []
for port in port_data:
resource_extend.apply_funcs('ports',
port['port_dict'],
port['port_obj'].db_obj)
completed_ports.append(
self._after_create_port(context,
port['port_dict'],
port['mech_context']))
return completed_ports
# TODO(yalei) - will be simplified after security group and address pair be # TODO(yalei) - will be simplified after security group and address pair be
# converted to ext driver too. # converted to ext driver too.

View File

@ -21,7 +21,6 @@ import itertools
import eventlet import eventlet
import mock import mock
import netaddr import netaddr
from neutron_lib.api import validators
from neutron_lib.callbacks import exceptions from neutron_lib.callbacks import exceptions
from neutron_lib.callbacks import registry from neutron_lib.callbacks import registry
from neutron_lib import constants from neutron_lib import constants
@ -61,7 +60,6 @@ from neutron.ipam.drivers.neutrondb_ipam import driver as ipam_driver
from neutron.ipam import exceptions as ipam_exc from neutron.ipam import exceptions as ipam_exc
from neutron.objects import network as network_obj from neutron.objects import network as network_obj
from neutron.objects import router as l3_obj from neutron.objects import router as l3_obj
from neutron.plugins.ml2.common import exceptions as ml2_exceptions
from neutron import policy from neutron import policy
from neutron.tests import base from neutron.tests import base
from neutron.tests.unit.api import test_extensions from neutron.tests.unit.api import test_extensions
@ -616,7 +614,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _fail_second_call(self, patched_plugin, orig, *args, **kwargs): def _fail_second_call(self, patched_plugin, orig, *args, **kwargs):
"""Invoked by test cases for injecting failures in plugin.""" """Invoked by test cases for injecting failures in plugin."""
def second_call(*args, **kwargs): def second_call(*args, **kwargs):
raise lib_exc.NeutronException() raise lib_exc.NeutronException(message="_fail_second_call")
patched_plugin.side_effect = second_call patched_plugin.side_effect = second_call
return orig(*args, **kwargs) return orig(*args, **kwargs)
@ -1145,10 +1143,14 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
with mock.patch('six.moves.builtins.hasattr', with mock.patch('six.moves.builtins.hasattr',
new=fakehasattr): new=fakehasattr):
orig = directory.get_plugin().create_port plugin = directory.get_plugin()
method_to_patch = _get_create_db_method('port') method_to_patch = '_process_port_binding'
with mock.patch.object(directory.get_plugin(), if real_has_attr(plugin, method_to_patch):
method_to_patch) as patched_plugin: orig = plugin._process_port_binding
else:
method_to_patch = '_make_port_dict'
orig = plugin._make_port_dict
with mock.patch.object(plugin, method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs): def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig, return self._fail_second_call(patched_plugin, orig,
@ -1172,7 +1174,7 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
with self.network() as net: with self.network() as net:
plugin = directory.get_plugin() plugin = directory.get_plugin()
orig = plugin.create_port orig = plugin.create_port
method_to_patch = _get_create_db_method('port') method_to_patch = _get_create_db_method('port_bulk')
with mock.patch.object(plugin, method_to_patch) as patched_plugin: with mock.patch.object(plugin, method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs): def side_effect(*args, **kwargs):
@ -2614,58 +2616,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
plugin = directory.get_plugin() plugin = directory.get_plugin()
self._test_delete_ports_ignores_port_not_found(plugin) self._test_delete_ports_ignores_port_not_found(plugin)
def test_create_port_obj_bulk(self):
cfg.CONF.set_override('base_mac', "12:34:56:00")
test_mac = "00-12-34-56-78-90"
num_ports = 4
plugin = directory.get_plugin()
tenant_id = 'some_tenant'
device_owner = "me"
ctx = context.Context('', tenant_id)
with self.network(tenant_id=tenant_id) as network_to_use:
net_id = network_to_use['network']['id']
port = {'port': {'name': 'port',
'network_id': net_id,
'mac_address': constants.ATTR_NOT_SPECIFIED,
'fixed_ips': constants.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'device_id': 'device_id',
'device_owner': device_owner,
'tenant_id': tenant_id}}
ports = [copy.deepcopy(port) for x in range(num_ports)]
ports[1]['port']['mac_address'] = test_mac
port_data = plugin.create_port_obj_bulk(ctx, ports)
self.assertEqual(num_ports, len(port_data))
result_macs = []
for port in port_data:
port_mac = str(port.get('mac_address'))
self.assertIsNone(validators.validate_mac_address(port_mac))
result_macs.append(port_mac)
for ip_addr in port.get('fixed_ips'):
self.assertIsNone(validators.validate_ip_address(ip_addr))
self.assertTrue(test_mac in result_macs)
def test_create_port_obj_bulk_with_fixed_ips(self):
num_ports = 4
plugin = directory.get_plugin()
tenant_id = 'some_tenant'
device_owner = "me"
ctx = context.Context('', tenant_id)
with self.network(tenant_id=tenant_id) as network_to_use:
net_id = network_to_use['network']['id']
fixed_ip = [dict(ip_address='10.0.0.5')]
port = {'port': {'name': 'port',
'network_id': net_id,
'mac_address': constants.ATTR_NOT_SPECIFIED,
'fixed_ips': fixed_ip,
'admin_state_up': True,
'device_id': 'device_id',
'device_owner': device_owner,
'tenant_id': tenant_id}}
ports = [port for x in range(num_ports)]
self.assertRaises(ml2_exceptions.BulkPortCannotHaveFixedIpError,
plugin.create_port_obj_bulk, ctx, ports)
class TestNetworksV2(NeutronDbPluginV2TestCase): class TestNetworksV2(NeutronDbPluginV2TestCase):
# NOTE(cerberus): successful network update and delete are # NOTE(cerberus): successful network update and delete are

View File

@ -13,7 +13,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import copy
import functools import functools
import weakref
import fixtures import fixtures
import mock import mock
@ -24,6 +26,7 @@ from neutron_lib.api.definitions import external_net as extnet_apidef
from neutron_lib.api.definitions import multiprovidernet as mpnet_apidef from neutron_lib.api.definitions import multiprovidernet as mpnet_apidef
from neutron_lib.api.definitions import portbindings from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import provider_net as pnet from neutron_lib.api.definitions import provider_net as pnet
from neutron_lib.api import validators
from neutron_lib.callbacks import events from neutron_lib.callbacks import events
from neutron_lib.callbacks import exceptions as c_exc from neutron_lib.callbacks import exceptions as c_exc
from neutron_lib.callbacks import registry from neutron_lib.callbacks import registry
@ -283,6 +286,43 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
self.assertEqual(n['network']['id'], self.assertEqual(n['network']['id'],
kwargs['network']['id']) kwargs['network']['id'])
def test_create_port_obj_bulk(self):
cfg.CONF.set_override('base_mac', "12:34:56:00")
test_mac = "00-12-34-56-78-90"
num_ports = 4
plugin = directory.get_plugin()
# Most of the plugin methods are undefined in a weakproxy. This is not
# the case most fo the time - Ml2Plugin is typically the plugin here -
# but the IPAM classes that inherit this test have a weakproxy here and
# thus fail. This avoids that error.
if isinstance(plugin, weakref.ProxyTypes):
self.skipTest("Bulk port method tests do not apply to IPAM plugin")
tenant_id = 'some_tenant'
device_owner = "me"
ctx = context.Context('', tenant_id)
with self.network(tenant_id=tenant_id) as network_to_use:
net_id = network_to_use['network']['id']
port = {'port': {'name': 'port',
'network_id': net_id,
'mac_address': constants.ATTR_NOT_SPECIFIED,
'fixed_ips': constants.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'device_id': 'device_id',
'device_owner': device_owner,
'tenant_id': tenant_id}}
ports = [copy.deepcopy(port) for x in range(num_ports)]
ports[1]['port']['mac_address'] = test_mac
port_data = plugin.create_port_obj_bulk(ctx, ports)
self.assertEqual(num_ports, len(port_data))
result_macs = []
for port in port_data:
port_mac = str(port.get('mac_address'))
self.assertIsNone(validators.validate_mac_address(port_mac))
result_macs.append(port_mac)
for ip_addr in port.get('fixed_ips'):
self.assertIsNone(validators.validate_ip_address(ip_addr))
self.assertTrue(test_mac in result_macs)
def test_bulk_network_before_and_after_events_outside_of_txn(self): def test_bulk_network_before_and_after_events_outside_of_txn(self):
# capture session states during each before and after event # capture session states during each before and after event
before = [] before = []
@ -886,27 +926,6 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
self._delete('ports', p['port']['id']) self._delete('ports', p['port']['id'])
self.assertFalse(self.tx_open) self.assertFalse(self.tx_open)
def test_bulk_ports_before_and_after_events_outside_of_txn(self):
with self.network() as n:
pass
# capture session states during each before and after event
before = []
after = []
b_func = lambda *a, **k: before.append(k['context'].session.is_active)
a_func = lambda *a, **k: after.append(k['context'].session.is_active)
registry.subscribe(b_func, resources.PORT, events.BEFORE_CREATE)
registry.subscribe(a_func, resources.PORT, events.AFTER_CREATE)
data = [{'tenant_id': self._tenant_id,
'network_id': n['network']['id']}] * 4
self._create_bulk_from_list(
self.fmt, 'port', data, context=context.get_admin_context())
# ensure events captured
self.assertTrue(before)
self.assertTrue(after)
# ensure session was closed for all
self.assertFalse(any(before))
self.assertFalse(any(after))
def test_create_router_port_and_fail_create_postcommit(self): def test_create_router_port_and_fail_create_postcommit(self):
with mock.patch.object(managers.MechanismManager, with mock.patch.object(managers.MechanismManager,
@ -1134,14 +1153,14 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
with self.network() as net: with self.network() as net:
plugin = directory.get_plugin() plugin = directory.get_plugin()
with mock.patch.object(plugin, '_bind_port_if_needed', with mock.patch.object(plugin, '_process_port_binding',
side_effect=ml2_exc.MechanismDriverError( side_effect=ml2_exc.MechanismDriverError(
method='create_port_bulk')) as _bind_port_if_needed: method='create_port_bulk')) as _process_port_binding:
res = self._create_port_bulk(self.fmt, 2, net['network']['id'], res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True, context=ctx) 'test', True, context=ctx)
self.assertTrue(_process_port_binding.called)
self.assertTrue(_bind_port_if_needed.called)
# We expect a 500 as we injected a fault in the plugin # We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure( self._validate_behavior_on_bulk_failure(
res, 'ports', webob.exc.HTTPServerError.code) res, 'ports', webob.exc.HTTPServerError.code)
@ -1156,16 +1175,19 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
res = self._create_port_bulk(self.fmt, 3, net['network']['id'], res = self._create_port_bulk(self.fmt, 3, net['network']['id'],
'test', True, context=ctx) 'test', True, context=ctx)
ports = self.deserialize(self.fmt, res) ports = self.deserialize(self.fmt, res)
used_sg = ports['ports'][0]['security_groups'] if 'ports' in ports:
m_upd.assert_has_calls( used_sg = ports['ports'][0]['security_groups']
[mock.call(ctx, [sg]) for sg in used_sg], any_order=True) m_upd.assert_has_calls(
[mock.call(ctx, [sg]) for sg in used_sg], any_order=True)
else:
self.assertTrue('ports' in ports)
def test_create_ports_bulk_with_sec_grp_member_provider_update(self): def test_create_ports_bulk_with_sec_grp_member_provider_update(self):
ctx = context.get_admin_context() ctx = context.get_admin_context()
plugin = directory.get_plugin() plugin = directory.get_plugin()
bulk_mock_name = "security_groups_member_updated"
with self.network() as net,\ with self.network() as net,\
mock.patch.object(plugin.notifier, mock.patch.object(plugin.notifier, bulk_mock_name) as m_upd:
'security_groups_member_updated') as m_upd:
net_id = net['network']['id'] net_id = net['network']['id']
data = [{ data = [{
@ -1183,7 +1205,7 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
data, context=ctx) data, context=ctx)
ports = self.deserialize(self.fmt, res) ports = self.deserialize(self.fmt, res)
used_sg = ports['ports'][0]['security_groups'] used_sg = ports['ports'][0]['security_groups']
m_upd.assert_called_once_with(ctx, used_sg) m_upd.assert_called_with(ctx, used_sg)
m_upd.reset_mock() m_upd.reset_mock()
data[0]['device_owner'] = constants.DEVICE_OWNER_DHCP data[0]['device_owner'] = constants.DEVICE_OWNER_DHCP
self._create_bulk_from_list(self.fmt, 'port', self._create_bulk_from_list(self.fmt, 'port',