diff --git a/etc/quantum.conf b/etc/quantum.conf index 97584d271e..ad8e18de2a 100644 --- a/etc/quantum.conf +++ b/etc/quantum.conf @@ -39,6 +39,8 @@ api_paste_config = api-paste.ini # Maximum amount of retries to generate a unique MAC address # mac_generation_retries = 16 +# Enable or disable bulk create/update/delete operations +# allow_bulk = True # RPC configuration options. Defined in rpc __init__ # The messaging module to use, defaults to kombu. # rpc_backend = quantum.openstack.common.notifier.rpc.impl_kombu diff --git a/quantum/api/v2/base.py b/quantum/api/v2/base.py index 5a013bb685..e499fa9060 100644 --- a/quantum/api/v2/base.py +++ b/quantum/api/v2/base.py @@ -117,15 +117,23 @@ def verbose(request): class Controller(object): - def __init__(self, plugin, collection, resource, attr_info): + def __init__(self, plugin, collection, resource, + attr_info, allow_bulk=False): self._plugin = plugin self._collection = collection self._resource = resource self._attr_info = attr_info + self._allow_bulk = allow_bulk + self._native_bulk = self._is_native_bulk_supported() self._policy_attrs = [name for (name, info) in self._attr_info.items() if info.get('required_by_policy')] self._publisher_id = notifier_api.publisher_id('network') + def _is_native_bulk_supported(self): + native_bulk_attr_name = ("_%s__native_bulk_support" + % self._plugin.__class__.__name__) + return getattr(self._plugin, native_bulk_attr_name, False) + def _is_visible(self, attr): attr_val = self._attr_info.get(attr) return attr_val and attr_val['is_visible'] @@ -209,6 +217,32 @@ class Controller(object): # doesn't exist raise webob.exc.HTTPNotFound() + def _emulate_bulk_create(self, obj_creator, request, body): + objs = [] + try: + for item in body[self._collection]: + kwargs = {self._resource: item} + objs.append(self._view(obj_creator(request.context, + **kwargs))) + return objs + # Note(salvatore-orlando): broad catch as in theory a plugin + # could raise any kind of exception + except Exception as ex: + for obj in objs: + delete_action = "delete_%s" % self._resource + obj_deleter = getattr(self._plugin, delete_action) + try: + obj_deleter(request.context, obj['id']) + except Exception: + # broad catch as our only purpose is to log the exception + LOG.exception("Unable to undo add for %s %s", + self._resource, obj['id']) + # TODO(salvatore-orlando): The object being processed when the + # plugin raised might have been created or not in the db. + # We need a way for ensuring that if it has been created, + # it is then deleted + raise + def create(self, request, body=None): """Creates a new instance of the requested entity""" notifier_api.notify(request.context, @@ -216,10 +250,8 @@ class Controller(object): self._resource + '.create.start', notifier_api.INFO, body) - body = self._prepare_request_body(request.context, body, True, - allow_bulk=True) + body = self._prepare_request_body(request.context, body, True) action = "create_%s" % self._resource - # Check authz try: if self._collection in body: @@ -256,16 +288,30 @@ class Controller(object): LOG.exception("Create operation not authorized") raise webob.exc.HTTPForbidden() - obj_creator = getattr(self._plugin, action) - kwargs = {self._resource: body} - obj = obj_creator(request.context, **kwargs) - result = {self._resource: self._view(obj)} - notifier_api.notify(request.context, - self._publisher_id, - self._resource + '.create.end', - notifier_api.INFO, - result) - return result + def notify(create_result): + notifier_api.notify(request.context, + self._publisher_id, + self._resource + '.create.end', + notifier_api.INFO, + create_result) + return create_result + + if self._collection in body and self._native_bulk: + # plugin does atomic bulk create operations + obj_creator = getattr(self._plugin, "%s_bulk" % action) + objs = obj_creator(request.context, body) + return notify({self._collection: [self._view(obj) + for obj in objs]}) + else: + obj_creator = getattr(self._plugin, action) + if self._collection in body: + # Emulate atomic bulk behavior + objs = self._emulate_bulk_create(obj_creator, request, body) + return notify({self._collection: objs}) + else: + kwargs = {self._resource: body} + obj = obj_creator(request.context, **kwargs) + return notify({self._resource: self._view(obj)}) def delete(self, request, id): """Deletes the specified entity""" @@ -355,8 +401,7 @@ class Controller(object): " that tenant_id is specified") raise webob.exc.HTTPBadRequest(msg) - def _prepare_request_body(self, context, body, is_create, - allow_bulk=False): + def _prepare_request_body(self, context, body, is_create): """ verifies required attributes are in request body, and that an attribute is only specified if it is allowed for the given operation (create/update). @@ -369,7 +414,7 @@ class Controller(object): raise webob.exc.HTTPBadRequest(_("Resource body required")) body = body or {self._resource: {}} - if self._collection in body and allow_bulk: + if self._collection in body and self._allow_bulk: bulk_body = [self._prepare_request_body(context, {self._resource: b}, is_create) @@ -382,7 +427,7 @@ class Controller(object): return {self._collection: bulk_body} - elif self._collection in body and not allow_bulk: + elif self._collection in body and not self._allow_bulk: raise webob.exc.HTTPBadRequest("Bulk operation not supported") res_dict = body.get(self._resource) @@ -459,8 +504,8 @@ class Controller(object): }) -def create_resource(collection, resource, plugin, params): - controller = Controller(plugin, collection, resource, params) +def create_resource(collection, resource, plugin, params, allow_bulk=False): + controller = Controller(plugin, collection, resource, params, allow_bulk) # NOTE(jkoelker) To anyone wishing to add "proper" xml support # this is where you do it diff --git a/quantum/api/v2/router.py b/quantum/api/v2/router.py index af07e3ec24..dcd719373d 100644 --- a/quantum/api/v2/router.py +++ b/quantum/api/v2/router.py @@ -69,7 +69,6 @@ class APIRouter(wsgi.Router): def __init__(self, **local_config): mapper = routes_mapper.Mapper() plugin = manager.QuantumManager.get_plugin() - ext_mgr = extensions.PluginAwareExtensionManager.get_instance() ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP) @@ -81,8 +80,10 @@ class APIRouter(wsgi.Router): 'port': 'ports'} def _map_resource(collection, resource, params): + allow_bulk = cfg.CONF.allow_bulk controller = base.create_resource(collection, resource, - plugin, params) + plugin, params, + allow_bulk=allow_bulk) mapper_kwargs = dict(controller=controller, requirements=REQUIREMENTS, **col_kwargs) diff --git a/quantum/common/config.py b/quantum/common/config.py index eb84e9eb41..6b463f3dee 100644 --- a/quantum/common/config.py +++ b/quantum/common/config.py @@ -43,7 +43,8 @@ core_opts = [ cfg.StrOpt('core_plugin', default='quantum.plugins.sample.SamplePlugin.FakePlugin'), cfg.StrOpt('base_mac', default="fa:16:3e:00:00:00"), - cfg.IntOpt('mac_generation_retries', default=16) + cfg.IntOpt('mac_generation_retries', default=16), + cfg.BoolOpt('allow_bulk', default=True), ] # Register the configuration options diff --git a/quantum/db/db_base_plugin_v2.py b/quantum/db/db_base_plugin_v2.py index e50d2e4c77..eb3f997a75 100644 --- a/quantum/db/db_base_plugin_v2.py +++ b/quantum/db/db_base_plugin_v2.py @@ -41,6 +41,11 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): certain events. """ + # This attribute specifies whether the plugin supports or not + # bulk operations. Name mangling is used in order to ensure it + # is qualified by class + __native_bulk_support = True + def __init__(self): # NOTE(jkoelker) This is an incomlete implementation. Subclasses # must override __init__ and setup the database @@ -673,12 +678,34 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): "device_id": port["device_id"]} return self._fields(res, fields) + def _create_bulk(self, resource, context, request_items): + objects = [] + collection = "%ss" % resource + items = request_items[collection] + context.session.begin(subtransactions=True) + try: + for item in items: + obj_creator = getattr(self, 'create_%s' % resource) + objects.append(obj_creator(context, item)) + context.session.commit() + except Exception: + LOG.exception("An exception occured while creating " + "the port:%s", item) + context.session.rollback() + raise + return objects + + def create_network_bulk(self, context, networks): + return self._create_bulk('network', context, networks) + def create_network(self, context, network): + """ handle creation of a single network """ + # single request processing n = network['network'] # NOTE(jkoelker) Get the tenant_id outside of the session to avoid # unneeded db action if the operation raises tenant_id = self._get_tenant_id_for_create(context, n) - with context.session.begin(): + with context.session.begin(subtransactions=True): network = models_v2.Network(tenant_id=tenant_id, id=n.get('id') or utils.str_uuid(), name=n['name'], @@ -721,6 +748,9 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): filters=filters, fields=fields, verbose=verbose) + def create_subnet_bulk(self, context, subnets): + return self._create_bulk('subnet', context, subnets) + def create_subnet(self, context, subnet): s = subnet['subnet'] net = netaddr.IPNetwork(s['cidr']) @@ -728,7 +758,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1)) tenant_id = self._get_tenant_id_for_create(context, s) - with context.session.begin(): + with context.session.begin(subtransactions=True): network = self._get_network(context, s["network_id"]) self._validate_subnet_cidr(network, s['cidr']) subnet = models_v2.Subnet(tenant_id=tenant_id, @@ -780,13 +810,16 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): filters=filters, fields=fields, verbose=verbose) + def create_port_bulk(self, context, ports): + return self._create_bulk('port', context, ports) + def create_port(self, context, port): p = port['port'] # NOTE(jkoelker) Get the tenant_id outside of the session to avoid # unneeded db action if the operation raises tenant_id = self._get_tenant_id_for_create(context, p) - with context.session.begin(): + with context.session.begin(subtransactions=True): network = self._get_network(context, p["network_id"]) # Ensure that a MAC address is defined and it is unique on the @@ -817,7 +850,7 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2): # Update the allocated IP's if ips: - with context.session.begin(): + with context.session.begin(subtransactions=True): for ip in ips: LOG.debug("Allocated IP %s (%s/%s/%s)", ip['ip_address'], port['network_id'], ip['subnet_id'], port.id) diff --git a/quantum/plugins/linuxbridge/lb_quantum_plugin.py b/quantum/plugins/linuxbridge/lb_quantum_plugin.py index 1b0507c8d5..80f88b9d48 100644 --- a/quantum/plugins/linuxbridge/lb_quantum_plugin.py +++ b/quantum/plugins/linuxbridge/lb_quantum_plugin.py @@ -196,7 +196,6 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2): super(LinuxBridgePluginV2, self).delete_network(context, net['id']) raise - return net def update_network(self, context, id, network): diff --git a/quantum/plugins/openvswitch/ovs_db_v2.py b/quantum/plugins/openvswitch/ovs_db_v2.py index 7298e439d7..d7c29c1bb4 100644 --- a/quantum/plugins/openvswitch/ovs_db_v2.py +++ b/quantum/plugins/openvswitch/ovs_db_v2.py @@ -40,8 +40,8 @@ def get_vlans(): return [(binding.vlan_id, binding.network_id) for binding in bindings] -def get_vlan(net_id): - session = db.get_session() +def get_vlan(net_id, session=None): + session = session or db.get_session() try: binding = (session.query(ovs_models_v2.VlanBinding). filter_by(network_id=net_id). @@ -51,11 +51,10 @@ def get_vlan(net_id): return binding.vlan_id -def add_vlan_binding(vlan_id, net_id): - session = db.get_session() - binding = ovs_models_v2.VlanBinding(vlan_id, net_id) - session.add(binding) - session.flush() +def add_vlan_binding(vlan_id, net_id, session): + with session.begin(subtransactions=True): + binding = ovs_models_v2.VlanBinding(vlan_id, net_id) + session.add(binding) return binding @@ -114,10 +113,9 @@ def get_vlan_id(vlan_id): return None -def reserve_vlan_id(): +def reserve_vlan_id(session): """Reserve an unused vlan_id""" - session = db.get_session() with session.begin(subtransactions=True): record = (session.query(ovs_models_v2.VlanID). filter_by(vlan_used=False). @@ -129,14 +127,13 @@ def reserve_vlan_id(): return record.vlan_id -def reserve_specific_vlan_id(vlan_id): +def reserve_specific_vlan_id(vlan_id, session): """Reserve a specific vlan_id""" if vlan_id < 1 or vlan_id > 4094: msg = _("Specified VLAN %s outside legal range (1-4094)") % vlan_id raise q_exc.InvalidInput(error_message=msg) - session = db.get_session() with session.begin(subtransactions=True): try: record = (session.query(ovs_models_v2.VlanID). diff --git a/quantum/plugins/openvswitch/ovs_quantum_plugin.py b/quantum/plugins/openvswitch/ovs_quantum_plugin.py index 068f4adc4d..6792cd54f5 100644 --- a/quantum/plugins/openvswitch/ovs_quantum_plugin.py +++ b/quantum/plugins/openvswitch/ovs_quantum_plugin.py @@ -177,6 +177,10 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2): be updated to take advantage of it. """ + # This attribute specifies whether the plugin supports or not + # bulk operations. Name mangling is used in order to ensure it + # is qualified by class + __native_bulk_support = True supported_extension_aliases = ["provider"] def __init__(self, configfile=None): @@ -227,7 +231,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2): def _extend_network_dict(self, context, network): if self._check_provider_view_auth(context, network): if not self.enable_tunneling: - network['provider:vlan_id'] = ovs_db_v2.get_vlan(network['id']) + network['provider:vlan_id'] = ovs_db_v2.get_vlan( + network['id'], context.session) def create_network(self, context, network): net = super(OVSQuantumPluginV2, self).create_network(context, network) @@ -235,15 +240,15 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2): vlan_id = network['network'].get('provider:vlan_id') if vlan_id not in (None, attributes.ATTR_NOT_SPECIFIED): self._enforce_provider_set_auth(context, net) - ovs_db_v2.reserve_specific_vlan_id(vlan_id) + ovs_db_v2.reserve_specific_vlan_id(vlan_id, context.session) else: - vlan_id = ovs_db_v2.reserve_vlan_id() + vlan_id = ovs_db_v2.reserve_vlan_id(context.session) except Exception: super(OVSQuantumPluginV2, self).delete_network(context, net['id']) raise LOG.debug("Created network: %s" % net['id']) - ovs_db_v2.add_vlan_binding(vlan_id, str(net['id'])) + ovs_db_v2.add_vlan_binding(vlan_id, str(net['id']), context.session) self._extend_network_dict(context, net) return net diff --git a/quantum/plugins/openvswitch/tests/unit/test_ovs_db.py b/quantum/plugins/openvswitch/tests/unit/test_ovs_db.py index 707bb52505..8f82ea434a 100644 --- a/quantum/plugins/openvswitch/tests/unit/test_ovs_db.py +++ b/quantum/plugins/openvswitch/tests/unit/test_ovs_db.py @@ -63,46 +63,50 @@ class OVSVlanIdsTest(unittest2.TestCase): self.assertIsNone(ovs_db_v2.get_vlan_id(VLAN_MAX + 5 + 1)) def test_vlan_id_pool(self): + session = db.get_session() vlan_ids = set() for x in xrange(VLAN_MIN, VLAN_MAX + 1): - vlan_id = ovs_db_v2.reserve_vlan_id() + vlan_id = ovs_db_v2.reserve_vlan_id(db.get_session()) self.assertGreaterEqual(vlan_id, VLAN_MIN) self.assertLessEqual(vlan_id, VLAN_MAX) vlan_ids.add(vlan_id) with self.assertRaises(q_exc.NoNetworkAvailable): - vlan_id = ovs_db_v2.reserve_vlan_id() + vlan_id = ovs_db_v2.reserve_vlan_id(session) for vlan_id in vlan_ids: ovs_db_v2.release_vlan_id(vlan_id) def test_invalid_specific_vlan_id(self): + session = db.get_session() with self.assertRaises(q_exc.InvalidInput): - vlan_id = ovs_db_v2.reserve_specific_vlan_id(0) + vlan_id = ovs_db_v2.reserve_specific_vlan_id(0, session) with self.assertRaises(q_exc.InvalidInput): - vlan_id = ovs_db_v2.reserve_specific_vlan_id(4095) + vlan_id = ovs_db_v2.reserve_specific_vlan_id(4095, session) def test_specific_vlan_id_inside_pool(self): + session = db.get_session() vlan_id = VLAN_MIN + 5 self.assertFalse(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) - ovs_db_v2.reserve_specific_vlan_id(vlan_id) + ovs_db_v2.reserve_specific_vlan_id(vlan_id, session) self.assertTrue(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) with self.assertRaises(q_exc.VlanIdInUse): - ovs_db_v2.reserve_specific_vlan_id(vlan_id) + ovs_db_v2.reserve_specific_vlan_id(vlan_id, session) ovs_db_v2.release_vlan_id(vlan_id) self.assertFalse(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) def test_specific_vlan_id_outside_pool(self): + session = db.get_session() vlan_id = VLAN_MAX + 5 self.assertIsNone(ovs_db_v2.get_vlan_id(vlan_id)) - ovs_db_v2.reserve_specific_vlan_id(vlan_id) + ovs_db_v2.reserve_specific_vlan_id(vlan_id, session) self.assertTrue(ovs_db_v2.get_vlan_id(vlan_id).vlan_used) with self.assertRaises(q_exc.VlanIdInUse): - ovs_db_v2.reserve_specific_vlan_id(vlan_id) + ovs_db_v2.reserve_specific_vlan_id(vlan_id, session) ovs_db_v2.release_vlan_id(vlan_id) self.assertIsNone(ovs_db_v2.get_vlan_id(vlan_id)) diff --git a/quantum/tests/unit/test_api_v2.py b/quantum/tests/unit/test_api_v2.py index f46eaadba2..0c7360b8c7 100644 --- a/quantum/tests/unit/test_api_v2.py +++ b/quantum/tests/unit/test_api_v2.py @@ -551,16 +551,17 @@ class JSONV2TestCase(APIv2TestBase): self.assertEqual(res.status_int, 422) def test_create_bulk(self): - data = {'networks': [{'name': 'net1', 'admin_state_up': True, + data = {'networks': [{'name': 'net1', + 'admin_state_up': True, 'tenant_id': _uuid()}, - {'name': 'net2', 'admin_state_up': True, + {'name': 'net2', + 'admin_state_up': True, 'tenant_id': _uuid()}]} def side_effect(context, network): - nets = network.copy() - for net in nets['networks']: - net.update({'subnets': []}) - return nets + net = network.copy() + net['network'].update({'subnets': []}) + return net['network'] instance = self.plugin.return_value instance.create_network.side_effect = side_effect @@ -904,7 +905,6 @@ class ExtensionTestCase(unittest.TestCase): self.api = None self.plugin = None cfg.CONF.reset() - # Restore the global RESOURCE_ATTRIBUTE_MAP attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map diff --git a/quantum/tests/unit/test_db_plugin.py b/quantum/tests/unit/test_db_plugin.py index 7dd902f318..9c37e4c8d9 100644 --- a/quantum/tests/unit/test_db_plugin.py +++ b/quantum/tests/unit/test_db_plugin.py @@ -14,6 +14,7 @@ # limitations under the License. import contextlib +import copy import logging import mock import os @@ -36,6 +37,7 @@ from quantum.wsgi import Serializer, JSONDeserializer LOG = logging.getLogger(__name__) +DB_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2' ROOTDIR = os.path.dirname(os.path.dirname(__file__)) ETCDIR = os.path.join(ROOTDIR, 'etc') @@ -62,10 +64,7 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): 'application/json': json_deserializer, } - plugin = test_config.get('plugin_name_v2', - 'quantum.db.db_base_plugin_v2.' - 'QuantumDbPluginV2') - LOG.debug("db plugin test, the plugin is:%s", plugin) + plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS) # Create the default configurations args = ['--config-file', etcdir('quantum.conf.test')] config.parse(args=args) @@ -74,6 +73,14 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab") self.api = APIRouter() + def _is_native_bulk_supported(): + plugin_obj = QuantumManager.get_plugin() + native_bulk_attr_name = ("_%s__native_bulk_support" + % plugin_obj.__class__.__name__) + return getattr(plugin_obj, native_bulk_attr_name, False) + + self._skip_native_bulk = not _is_native_bulk_supported() + def tearDown(self): super(QuantumDbPluginV2TestCase, self).tearDown() # NOTE(jkoelker) for a 'pluggable' framework, Quantum sure @@ -118,6 +125,28 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): data = self._deserializers[ctype].deserialize(response.body)['body'] return data + def _create_bulk(self, fmt, number, resource, data, name='test', **kwargs): + """ Creates a bulk request for any kind of resource """ + objects = [] + collection = "%ss" % resource + for i in range(0, number): + obj = copy.deepcopy(data) + obj[resource]['name'] = "%s_%s" % (name, i) + if 'override' in kwargs and i in kwargs['override']: + obj[resource].update(kwargs['override'][i]) + objects.append(obj) + req_data = {collection: objects} + req = self.new_create_request(collection, req_data, fmt) + if ('set_context' in kwargs and + kwargs['set_context'] is True and + 'tenant_id' in kwargs): + # create a specific auth context for this request + req.environ['quantum.context'] = context.Context( + '', kwargs['tenant_id']) + elif 'context' in kwargs: + req.environ['quantum.context'] = kwargs['context'] + return req.get_response(self.api) + def _create_network(self, fmt, name, admin_status_up, **kwargs): data = {'network': {'name': name, 'admin_state_up': admin_status_up, @@ -134,6 +163,12 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): return network_req.get_response(self.api) + def _create_network_bulk(self, fmt, number, name, + admin_status_up, **kwargs): + base_data = {'network': {'admin_state_up': admin_status_up, + 'tenant_id': self._tenant_id}} + return self._create_bulk(fmt, number, 'network', base_data, **kwargs) + def _create_subnet(self, fmt, net_id, cidr, expected_res_status=None, **kwargs): data = {'subnet': {'network_id': net_id, @@ -157,6 +192,19 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): self.assertEqual(subnet_res.status_int, expected_res_status) return subnet_res + def _create_subnet_bulk(self, fmt, number, net_id, name, + ip_version=4, **kwargs): + base_data = {'subnet': {'network_id': net_id, + 'ip_version': ip_version, + 'tenant_id': self._tenant_id}} + # auto-generate cidrs as they should not overlap + overrides = dict((k, v) + for (k, v) in zip(range(0, number), + [{'cidr': "10.0.%s.0/24" % num} + for num in range(0, number)])) + kwargs.update({'override': overrides}) + return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs) + def _create_port(self, fmt, net_id, expected_res_status=None, **kwargs): content_type = 'application/' + fmt data = {'port': {'network_id': net_id, @@ -196,6 +244,13 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): self.assertEqual(port_res.status_int, expected_res_status) return port_res + def _create_port_bulk(self, fmt, number, net_id, name, + admin_status_up, **kwargs): + base_data = {'port': {'network_id': net_id, + 'admin_state_up': admin_status_up, + 'tenant_id': self._tenant_id}} + return self._create_bulk(fmt, number, 'port', base_data, **kwargs) + def _make_subnet(self, fmt, network, gateway, cidr, allocation_pools=None, ip_version=4, enable_dhcp=True): res = self._create_subnet(fmt, @@ -220,6 +275,29 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): req = self.new_delete_request(collection, id) req.get_response(self.api) + def _do_side_effect(self, patched_plugin, orig, *args, **kwargs): + """ Invoked by test cases for injecting failures in plugin """ + def second_call(*args, **kwargs): + raise Exception('boom') + patched_plugin.side_effect = second_call + return orig(*args, **kwargs) + + def _validate_behavior_on_bulk_failure(self, res, collection): + self.assertEqual(res.status_int, 500) + req = self.new_list_request(collection) + res = req.get_response(self.api) + self.assertEquals(res.status_int, 200) + items = self.deserialize('json', res) + self.assertEqual(len(items[collection]), 0) + + def _validate_behavior_on_bulk_success(self, res, collection, + names=['test_0', 'test_1']): + self.assertEqual(res.status_int, 201) + items = self.deserialize('json', res)[collection] + self.assertEqual(len(items), 2) + self.assertEqual(items[0]['name'], 'test_0') + self.assertEqual(items[1]['name'], 'test_1') + @contextlib.contextmanager def network(self, name='net1', admin_status_up=True, @@ -429,6 +507,90 @@ class TestPortsV2(QuantumDbPluginV2TestCase): self.assertEquals(port['port'][k], v) self.assertTrue('mac_address' in port['port']) + def test_create_ports_bulk_native(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk port create") + with self.network() as net: + res = self._create_port_bulk('json', 2, net['network']['id'], + 'test', True) + self._validate_behavior_on_bulk_success(res, 'ports') + + def test_create_ports_bulk_emulated(self): + real_has_attr = hasattr + + #ensures the API choose the emulation code path + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + with self.network() as net: + res = self._create_port_bulk('json', 2, net['network']['id'], + 'test', True) + self._validate_behavior_on_bulk_success(res, 'ports') + + def test_create_ports_bulk_wrong_input(self): + with self.network() as net: + overrides = {1: {'admin_state_up': 'doh'}} + res = self._create_port_bulk('json', 2, net['network']['id'], + 'test', True, + override=overrides) + self.assertEqual(res.status_int, 400) + req = self.new_list_request('ports') + res = req.get_response(self.api) + self.assertEquals(res.status_int, 200) + ports = self.deserialize('json', res) + self.assertEqual(len(ports['ports']), 0) + + def test_create_ports_bulk_emulated_plugin_failure(self): + real_has_attr = hasattr + + #ensures the API choose the emulation code path + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + orig = QuantumManager.get_plugin().create_port + with mock.patch.object(QuantumManager.get_plugin(), + 'create_port') as patched_plugin: + + def side_effect(*args, **kwargs): + return self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + with self.network() as net: + res = self._create_port_bulk('json', 2, + net['network']['id'], + 'test', + True) + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'ports') + + def test_create_ports_bulk_native_plugin_failure(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk port create") + ctx = context.get_admin_context() + with self.network() as net: + orig = QuantumManager._instance.plugin.create_port + with mock.patch.object(QuantumManager._instance.plugin, + 'create_port') as patched_plugin: + + def side_effect(*args, **kwargs): + return self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + res = self._create_port_bulk('json', 2, net['network']['id'], + 'test', True, context=ctx) + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'ports') + def test_list_ports(self): with contextlib.nested(self.port(), self.port()) as (port1, port2): req = self.new_list_request('ports', 'json') @@ -1061,6 +1223,77 @@ class TestNetworksV2(QuantumDbPluginV2TestCase): network['network']['id']) self.assertEqual(req.get_response(self.api).status_int, 409) + def test_create_networks_bulk_native(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk network create") + res = self._create_network_bulk('json', 2, 'test', True) + self._validate_behavior_on_bulk_success(res, 'networks') + + def test_create_networks_bulk_emulated(self): + real_has_attr = hasattr + + #ensures the API choose the emulation code path + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + res = self._create_network_bulk('json', 2, 'test', True) + self._validate_behavior_on_bulk_success(res, 'networks') + + def test_create_networks_bulk_wrong_input(self): + res = self._create_network_bulk('json', 2, 'test', True, + override={1: + {'admin_state_up': 'doh'}}) + self.assertEqual(res.status_int, 400) + req = self.new_list_request('networks') + res = req.get_response(self.api) + self.assertEquals(res.status_int, 200) + nets = self.deserialize('json', res) + self.assertEqual(len(nets['networks']), 0) + + def test_create_networks_bulk_emulated_plugin_failure(self): + real_has_attr = hasattr + + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + orig = QuantumManager.get_plugin().create_network + #ensures the API choose the emulation code path + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + with mock.patch.object(QuantumManager.get_plugin(), + 'create_network') as patched_plugin: + + def side_effect(*args, **kwargs): + return self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + res = self._create_network_bulk('json', 2, 'test', True) + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'networks') + + def test_create_networks_bulk_native_plugin_failure(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk network create") + orig = QuantumManager.get_plugin().create_network + with mock.patch.object(QuantumManager.get_plugin(), + 'create_network') as patched_plugin: + + def side_effect(*args, **kwargs): + return self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + res = self._create_network_bulk('json', 2, 'test', True) + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'networks') + def test_list_networks(self): with self.network(name='net1') as net1: with self.network(name='net2') as net2: @@ -1157,6 +1390,77 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase): pass self.assertEquals(ctx_manager.exception.code, 400) + def test_create_subnets_bulk_native(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk subnet create") + with self.network() as net: + res = self._create_subnet_bulk('json', 2, net['network']['id'], + 'test') + self._validate_behavior_on_bulk_success(res, 'subnets') + + def test_create_subnets_bulk_emulated(self): + real_has_attr = hasattr + + #ensures the API choose the emulation code path + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + with self.network() as net: + res = self._create_subnet_bulk('json', 2, + net['network']['id'], + 'test') + self._validate_behavior_on_bulk_success(res, 'subnets') + + def test_create_subnets_bulk_emulated_plugin_failure(self): + real_has_attr = hasattr + + #ensures the API choose the emulation code path + def fakehasattr(item, attr): + if attr.endswith('__native_bulk_support'): + return False + return real_has_attr(item, attr) + + with mock.patch('__builtin__.hasattr', + new=fakehasattr): + orig = QuantumManager.get_plugin().create_subnet + with mock.patch.object(QuantumManager.get_plugin(), + 'create_subnet') as patched_plugin: + + def side_effect(*args, **kwargs): + self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + with self.network() as net: + res = self._create_subnet_bulk('json', 2, + net['network']['id'], + 'test') + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'subnets') + + def test_create_subnets_bulk_native_plugin_failure(self): + if self._skip_native_bulk: + self.skipTest("Plugin does not support native bulk subnet create") + orig = QuantumManager._instance.plugin.create_subnet + with mock.patch.object(QuantumManager._instance.plugin, + 'create_subnet') as patched_plugin: + def side_effect(*args, **kwargs): + return self._do_side_effect(patched_plugin, orig, + *args, **kwargs) + + patched_plugin.side_effect = side_effect + with self.network() as net: + res = self._create_subnet_bulk('json', 2, + net['network']['id'], + 'test') + + # We expect a 500 as we injected a fault in the plugin + self._validate_behavior_on_bulk_failure(res, 'subnets') + def test_delete_subnet(self): gateway_ip = '10.0.0.1' cidr = '10.0.0.0/24'