Remove `ConfDriver` code

The quota driver ``ConfDriver`` was deprecated in Liberty release.

``NullQuotaDriver`` is created for testing although it could be used
in production if no quota enforcement is needed. However, because
the Quota engine is not plugable (is an extension always loaded), it
could be interesting to make it plugable as any other plugin.

This patch also creates a Quota engine driver API class that should be
used in any Quota engine driver. Currently it is used in the three
in-tree drivers implemented: ``NullQuotaDriver``, ``DbQuotaDriver``
and ``DbQuotaNoLockDriver``.

Change-Id: Ib4af80e18fac52b9f68f26c84a215415e63c2822
Closes-Bug: #1928211
This commit is contained in:
Rodolfo Alonso Hernandez 2021-05-12 13:28:36 +00:00
parent 389584c389
commit ad31c58d60
19 changed files with 263 additions and 227 deletions

View File

@ -38,7 +38,7 @@ default quota values:
quota_port = 50
# default driver to use for quota checks
quota_driver = neutron.quota.ConfDriver
quota_driver = neutron.quota.DbQuotaNoLockDriver
OpenStack Networking also supports quotas for L3 resources:
router and floating IP. Add these lines to the

View File

@ -63,10 +63,9 @@ three quota drivers:
* neutron.db.quota.driver.DbQuotaDriver
* neutron.db.quota.driver_nolock.DbQuotaNoLockDriver (default)
* neutron.quota.ConfDriver
The latter driver is however deprecated. The ``DbQuotaNoLockDriver`` is the
default quota driver, defined in the configuration option ``quota_driver``.
The ``DbQuotaNoLockDriver`` is the default quota driver, defined in the
configuration option ``quota_driver``.
The Quota API extension handles quota management, whereas the Quota Engine
component handles quota enforcement. This API extension is loaded like any

View File

@ -489,7 +489,8 @@ class Controller(object):
tenant,
{self._resource: delta},
self._plugin)
reservations.append(reservation)
if reservation:
reservations.append(reservation)
except exceptions.QuotaResourceUnknown as e:
# We don't want to quota this resource
LOG.debug(e)

View File

@ -21,7 +21,6 @@ from neutron._i18n import _
QUOTA_DB_MODULE = 'neutron.db.quota.driver_nolock'
QUOTA_DB_DRIVER = QUOTA_DB_MODULE + '.DbQuotaNoLockDriver'
QUOTA_CONF_DRIVER = 'neutron.quota.ConfDriver'
QUOTAS_CFG_GROUP = 'QUOTAS'
DEFAULT_QUOTA = -1

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
import datetime
@ -236,3 +237,170 @@ def get_reservations_for_resources(context, tenant_id, resources,
@db_api.CONTEXT_WRITER
def remove_expired_reservations(context, tenant_id=None):
return quota_obj.Reservation.delete_expired(context, utcnow(), tenant_id)
class QuotaDriverAPI(object, metaclass=abc.ABCMeta):
@staticmethod
@abc.abstractmethod
def get_default_quotas(context, resources, project_id):
"""Given a list of resources, retrieve the default quotas set for
a tenant.
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resource keys.
:param project_id: The ID of the project to return default quotas for.
:return: dict from resource name to dict of name and limit
"""
@staticmethod
@abc.abstractmethod
def get_tenant_quotas(context, resources, project_id):
"""Retrieve the quotas for the given list of resources and project
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resource keys.
:param project_id: The ID of the project to return quotas for.
:return: dict from resource name to dict of name and limit
"""
@staticmethod
@abc.abstractmethod
def get_detailed_tenant_quotas(context, resources, project_id):
"""Retrieve detailed quotas for the given list of resources and project
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resource keys.
:param project_id: The ID of the project to return quotas for.
:return dict: mapping resource name in dict to its corresponding limit
used and reserved. Reserved currently returns default
value of 0
"""
@staticmethod
@abc.abstractmethod
def delete_tenant_quota(context, project_id):
"""Delete the quota entries for a given project_id.
After deletion, this tenant will use default quota values in conf.
Raise a "not found" error if the quota for the given tenant was
never defined.
:param context: The request context, for access checks.
:param project_id: The ID of the project to return quotas for.
"""
@staticmethod
@abc.abstractmethod
def get_all_quotas(context, resources):
"""Given a list of resources, retrieve the quotas for the all tenants.
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resource keys.
:return: quotas list of dict of project_id:, resourcekey1:
resourcekey2: ...
"""
@staticmethod
@abc.abstractmethod
def update_quota_limit(context, project_id, resource, limit):
"""Update the quota limit for a resource in a project
:param context: The request context, for access checks.
:param project_id: The ID of the project to update the quota.
:param resource: the resource to update the quota.
:param limit: new resource quota limit.
"""
@staticmethod
@abc.abstractmethod
def make_reservation(context, project_id, resources, deltas, plugin):
"""Make multiple resource reservations for a given project
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resource keys.
:param project_id: The ID of the project to make the reservations for.
:return: ``ReservationInfo`` object.
"""
@staticmethod
@abc.abstractmethod
def commit_reservation(context, reservation_id):
"""Commit a reservation register
:param context: The request context, for access checks.
:param reservation_id: ID of the reservation register to commit.
"""
@staticmethod
@abc.abstractmethod
def cancel_reservation(context, reservation_id):
"""Cancel a reservation register
:param context: The request context, for access checks.
:param reservation_id: ID of the reservation register to cancel.
"""
@staticmethod
@abc.abstractmethod
def limit_check(context, project_id, resources, values):
"""Check simple quota limits.
For limits--those quotas for which there is no usage
synchronization function--this method checks that a set of
proposed values are permitted by the limit restriction.
If any of the proposed values is over the defined quota, an
OverQuota exception will be raised with the sorted list of the
resources which are too high. Otherwise, the method returns
nothing.
:param context: The request context, for access checks.
:param project_id: The ID of the project to make the reservations for.
:param resources: A dictionary of the registered resource.
:param values: A dictionary of the values to check against the
quota.
"""
class NullQuotaDriver(QuotaDriverAPI):
@staticmethod
def get_default_quotas(context, resources, project_id):
pass
@staticmethod
def get_tenant_quotas(context, resources, project_id):
pass
@staticmethod
def get_detailed_tenant_quotas(context, resources, project_id):
pass
@staticmethod
def delete_tenant_quota(context, project_id):
pass
@staticmethod
def get_all_quotas(context, resources):
pass
@staticmethod
def update_quota_limit(context, project_id, resource, limit):
pass
@staticmethod
def make_reservation(context, project_id, resources, deltas, plugin):
pass
@staticmethod
def commit_reservation(context, reservation_id):
pass
@staticmethod
def cancel_reservation(context, reservation_id):
pass
@staticmethod
def limit_check(context, project_id, resources, values):
pass

View File

@ -27,7 +27,7 @@ from neutron.quota import resource as res
LOG = log.getLogger(__name__)
class DbQuotaDriver(object):
class DbQuotaDriver(quota_api.QuotaDriverAPI):
"""Driver to perform necessary checks to enforce quotas and obtain quota
information.

View File

@ -142,8 +142,9 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
sg.rules.append(egress_rule)
sg.obj_reset_changes(['rules'])
quota.QUOTAS.commit_reservation(context,
reservation.reservation_id)
if reservation:
quota.QUOTAS.commit_reservation(context,
reservation.reservation_id)
# fetch sg from db to load the sg rules with sg model.
sg = sg_obj.SecurityGroup.get_object(context, id=sg.id)

View File

@ -53,7 +53,8 @@ class QuotaEnforcementHook(hooks.PecanHook):
LOG.debug("Made reservation on behalf of %(tenant_id)s "
"for: %(delta)s",
{'tenant_id': tenant_id, 'delta': {resource: delta}})
reservations.append(reservation)
if reservation:
reservations.append(reservation)
except exceptions.QuotaResourceUnknown as e:
# Quotas cannot be enforced on this resource
LOG.debug(e)

View File

@ -12,141 +12,23 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Quotas for instances, volumes, and floating ips."""
import sys
from neutron_lib import exceptions
from oslo_config import cfg
from oslo_log import log as logging
from oslo_log import versionutils
from oslo_utils import importutils
import webob
from neutron._i18n import _
from neutron.conf import quota
from neutron.db.quota import api as quota_api
from neutron.quota import resource_registry
LOG = logging.getLogger(__name__)
QUOTA_DB_MODULE = quota.QUOTA_DB_MODULE
QUOTA_DB_DRIVER = quota.QUOTA_DB_DRIVER
QUOTA_CONF_DRIVER = quota.QUOTA_CONF_DRIVER
# Register the configuration options
quota.register_quota_opts(quota.core_quota_opts)
class ConfDriver(object):
"""Configuration driver.
Driver to perform necessary checks to enforce quotas and obtain
quota information. The default driver utilizes the default values
in neutron.conf.
"""
def _get_quotas(self, context, resources):
"""Get quotas.
A helper method which retrieves the quotas for the specific
resources identified by keys, and which apply to the current
context.
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resources.
"""
quotas = {}
for resource in resources.values():
quotas[resource.name] = resource.default
return quotas
def limit_check(self, context, tenant_id,
resources, values):
"""Check simple quota limits.
For limits--those quotas for which there is no usage
synchronization function--this method checks that a set of
proposed values are permitted by the limit restriction.
If any of the proposed values is over the defined quota, an
OverQuota exception will be raised with the sorted list of the
resources which are too high. Otherwise, the method returns
nothing.
:param context: The request context, for access checks.
:param tenant_id: The tenant_id to check quota.
:param resources: A dictionary of the registered resources.
:param values: A dictionary of the values to check against the
quota.
"""
# Ensure no value is less than zero
unders = [key for key, val in values.items() if val < 0]
if unders:
raise exceptions.InvalidQuotaValue(unders=sorted(unders))
# Get the applicable quotas
quotas = self._get_quotas(context, resources)
# Check the quotas and construct a list of the resources that
# would be put over limit by the desired values
overs = [key for key, val in values.items()
if quotas[key] >= 0 and quotas[key] < val]
if overs:
raise exceptions.OverQuota(overs=sorted(overs), quotas=quotas,
usages={})
@staticmethod
def get_tenant_quotas(context, resources, tenant_id):
quotas = {}
sub_resources = dict((k, v) for k, v in resources.items())
for resource in sub_resources.values():
quotas[resource.name] = resource.default
return quotas
@staticmethod
def get_all_quotas(context, resources):
return []
@staticmethod
def delete_tenant_quota(context, tenant_id):
msg = _('Access to this resource was denied.')
raise webob.exc.HTTPForbidden(msg)
@staticmethod
def update_quota_limit(context, tenant_id, resource, limit):
msg = _('Access to this resource was denied.')
raise webob.exc.HTTPForbidden(msg)
def make_reservation(self, context, tenant_id, resources, deltas, plugin):
"""This driver does not support reservations.
This routine is provided for backward compatibility purposes with
the API controllers which have now been adapted to make reservations
rather than counting resources and checking limits - as this
routine ultimately does.
"""
for resource in deltas.keys():
count = QUOTAS.count(context, resource, plugin, tenant_id)
total_use = deltas.get(resource, 0) + count
deltas[resource] = total_use
self.limit_check(
context,
tenant_id,
resource_registry.get_all_resources(),
deltas)
# return a fake reservation - the REST controller expects it
return quota_api.ReservationInfo('fake', None, None, None)
def commit_reservation(self, context, reservation_id):
"""This is a noop as this driver does not support reservations."""
def cancel_reservation(self, context, reservation_id):
"""This is a noop as this driver does not support reservations."""
class QuotaEngine(object):
"""Represent the set of recognized quotas."""
@ -167,20 +49,8 @@ class QuotaEngine(object):
if self._driver is None:
_driver_class = (self._driver_class or
cfg.CONF.QUOTAS.quota_driver)
if (_driver_class == QUOTA_DB_DRIVER and
QUOTA_DB_MODULE not in sys.modules):
# If quotas table is not loaded, force config quota driver.
_driver_class = QUOTA_CONF_DRIVER
LOG.info("ConfDriver is used as quota_driver because the "
"loaded plugin does not support 'quotas' table.")
if isinstance(_driver_class, str):
_driver_class = importutils.import_object(_driver_class)
if isinstance(_driver_class, ConfDriver):
versionutils.report_deprecated_feature(
LOG, ("The quota driver neutron.quota.ConfDriver is "
"deprecated as of Liberty. "
"neutron.db.quota.driver.DbQuotaDriver should "
"be used in its place"))
self._driver = _driver_class
LOG.info('Loaded quota_driver: %s.', _driver_class)
return self._driver

View File

@ -1032,7 +1032,8 @@ class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
self.agentscheduler_dbMinxin = directory.get_plugin()
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
cfg.CONF.set_override('quota_driver',
'neutron.db.quota.api.NullQuotaDriver',
group='QUOTAS')
def _do_request(self, method, path, data=None, params=None, action=None):

View File

@ -42,7 +42,6 @@ from neutron.api.v2 import base as v2_base
from neutron.api.v2 import router
from neutron import policy
from neutron import quota
from neutron.quota import resource_registry
from neutron.tests import base
from neutron.tests import tools
from neutron.tests.unit import dummy_plugin
@ -50,6 +49,7 @@ from neutron.tests.unit import testlib_api
EXTDIR = os.path.join(base.ROOTDIR, 'unit/extensions')
NULL_QUOTA_DRIVER = 'neutron.db.quota.api.NullQuotaDriver'
_uuid = uuidutils.generate_uuid
@ -98,7 +98,7 @@ class APIv2TestBase(base.BaseTestCase):
self.api = webtest.TestApp(api)
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
cfg.CONF.set_override('quota_driver', quota.QUOTA_DB_DRIVER,
group='QUOTAS')
# APIRouter initialization resets policy module, re-initializing it
@ -1302,6 +1302,9 @@ class NotificationTest(APIv2TestBase):
def setUp(self):
super(NotificationTest, self).setUp()
fake_notifier.reset()
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', NULL_QUOTA_DRIVER,
group='QUOTAS')
def _resource_op_notifier(self, opname, resource, expected_errors=False):
initial_input = {resource: {'name': 'myname'}}
@ -1354,9 +1357,10 @@ class NotificationTest(APIv2TestBase):
class RegistryNotificationTest(APIv2TestBase):
def setUp(self):
# This test does not have database support so tracking cannot be used
cfg.CONF.set_override('track_quota_usage', False, group='QUOTAS')
super(RegistryNotificationTest, self).setUp()
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', NULL_QUOTA_DRIVER,
group='QUOTAS')
def _test_registry_notify(self, opname, resource, initial_input=None):
instance = self.plugin.return_value
@ -1400,69 +1404,28 @@ class RegistryNotificationTest(APIv2TestBase):
class QuotaTest(APIv2TestBase):
"""This class checks the quota enforcement API, regardless of the driver"""
def setUp(self):
# This test does not have database support so tracking cannot be used
cfg.CONF.set_override('track_quota_usage', False, group='QUOTAS')
super(QuotaTest, self).setUp()
# Use mock to let the API use a different QuotaEngine instance for
# unit test in this class. This will ensure resource are registered
# again and instantiated with neutron.quota.resource.CountableResource
replacement_registry = resource_registry.ResourceRegistry()
registry_patcher = mock.patch('neutron.quota.resource_registry.'
'ResourceRegistry.get_instance')
mock_registry = registry_patcher.start().return_value
mock_registry.get_resource = replacement_registry.get_resource
mock_registry.resources = replacement_registry.resources
# Register a resource
replacement_registry.register_resource_by_name('network')
def test_create_network_quota(self):
cfg.CONF.set_override('quota_network', 1, group='QUOTAS')
def test_create_network_quota_exceeded(self):
initial_input = {'network': {'name': 'net1', 'tenant_id': _uuid()}}
full_input = {'network': {'admin_state_up': True, 'subnets': []}}
full_input['network'].update(initial_input['network'])
instance = self.plugin.return_value
instance.get_networks_count.return_value = 1
res = self.api.post_json(
_get_path('networks'), initial_input, expect_errors=True)
instance.get_networks_count.assert_called_with(mock.ANY,
filters=mock.ANY)
self.assertIn("Quota exceeded for resources",
res.json['NeutronError']['message'])
def test_create_network_quota_no_counts(self):
cfg.CONF.set_override('quota_network', 1, group='QUOTAS')
initial_input = {'network': {'name': 'net1', 'tenant_id': _uuid()}}
full_input = {'network': {'admin_state_up': True, 'subnets': []}}
full_input['network'].update(initial_input['network'])
instance = self.plugin.return_value
instance.get_networks_count.side_effect = (
NotImplementedError())
instance.get_networks.return_value = ["foo"]
res = self.api.post_json(
_get_path('networks'), initial_input, expect_errors=True)
instance.get_networks_count.assert_called_with(mock.ANY,
filters=mock.ANY)
with mock.patch.object(quota.QUOTAS, 'make_reservation',
side_effect=n_exc.OverQuota(overs='network')):
res = self.api.post_json(
_get_path('networks'), initial_input, expect_errors=True)
self.assertIn("Quota exceeded for resources",
res.json['NeutronError']['message'])
def test_create_network_quota_without_limit(self):
cfg.CONF.set_override('quota_network', -1, group='QUOTAS')
initial_input = {'network': {'name': 'net1', 'tenant_id': _uuid()}}
instance = self.plugin.return_value
instance.get_networks_count.return_value = 3
res = self.api.post_json(
_get_path('networks'), initial_input)
with mock.patch.object(quota.QUOTAS, 'make_reservation'), \
mock.patch.object(quota.QUOTAS, 'commit_reservation'):
res = self.api.post_json(
_get_path('networks'), initial_input)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
class ExtensionTestCase(base.BaseTestCase):
def setUp(self):
# This test does not have database support so tracking cannot be used
cfg.CONF.set_override('track_quota_usage', False, group='QUOTAS')
super(ExtensionTestCase, self).setUp()
plugin = 'neutron.neutron_plugin_base_v2.NeutronPluginBaseV2'
# Ensure existing ExtensionManager is not used
@ -1487,7 +1450,7 @@ class ExtensionTestCase(base.BaseTestCase):
self.api = webtest.TestApp(api)
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
cfg.CONF.set_override('quota_driver', NULL_QUOTA_DRIVER,
group='QUOTAS')
def test_extended_create(self):

View File

@ -63,6 +63,8 @@ from neutron.ipam import exceptions as ipam_exc
from neutron.objects import network as network_obj
from neutron.objects import router as l3_obj
from neutron import policy
from neutron import quota
from neutron.quota import resource_registry
from neutron.tests import base
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit import testlib_api
@ -104,17 +106,26 @@ def _get_create_db_method(resource):
return 'create_%s' % resource
def _set_temporary_quota(resource, default_value):
quota_name = uuidutils.generate_uuid(dashed=False)
opt = cfg.IntOpt(quota_name, default=default_value)
cfg.CONF.register_opt(opt, group='QUOTAS')
resources = resource_registry.ResourceRegistry.get_instance().resources
resources[resource].flag = quota_name
class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
fmt = 'json'
resource_prefix_map = {}
block_dhcp_notifier = True
quota_db_driver = quota_conf.QUOTA_DB_DRIVER
def setUp(self, plugin=None, service_plugins=None,
ext_mgr=None):
quota.QUOTAS._driver = None
quota_conf.register_quota_opts(quota_conf.core_quota_opts, cfg.CONF)
cfg.CONF.set_override(
'quota_driver', 'neutron.db.quota.driver.DbQuotaDriver',
group=quota_conf.QUOTAS_CFG_GROUP)
cfg.CONF.set_override('quota_driver', self.quota_db_driver,
group=quota_conf.QUOTAS_CFG_GROUP)
super(NeutronDbPluginV2TestCase, self).setUp()
cfg.CONF.set_override('notify_nova_on_port_status_changes', False)
cfg.CONF.set_override('allow_overlapping_ips', True)
@ -146,6 +157,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
cfg.CONF.set_override('base_mac', "12:34:56:78:00:00")
cfg.CONF.set_override('max_dns_nameservers', 2)
cfg.CONF.set_override('max_subnet_host_routes', 2)
resource_registry.ResourceRegistry._instance = None
self.api = router.APIRouter()
# Set the default status
self.net_create_status = 'ACTIVE'
@ -1694,8 +1706,8 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res['port']['fixed_ips'])
def test_create_ports_native_quotas(self):
quota = 1
cfg.CONF.set_override('quota_port', quota, group='QUOTAS')
self._tenant_id = uuidutils.generate_uuid()
_set_temporary_quota('port', 1)
with self.network() as network:
res = self._create_port(self.fmt, network['network']['id'])
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
@ -1706,7 +1718,8 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
quota = 4
cfg.CONF.set_override('quota_port', quota, group='QUOTAS')
_set_temporary_quota('port', quota)
self._tenant_id = uuidutils.generate_uuid()
with self.network() as network:
res = self._create_port_bulk(self.fmt, quota + 1,
network['network']['id'],
@ -2861,7 +2874,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
def test_create_networks_native_quotas(self):
quota = 1
cfg.CONF.set_override('quota_network', quota, group='QUOTAS')
_set_temporary_quota('network', quota)
self._tenant_id = uuidutils.generate_uuid()
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
@ -2873,7 +2887,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
quota = 4
cfg.CONF.set_override('quota_network', quota, group='QUOTAS')
_set_temporary_quota('network', quota)
self._tenant_id = uuidutils.generate_uuid()
res = self._create_network_bulk(self.fmt, quota + 1, 'test', True)
self._validate_behavior_on_bulk_failure(
res, 'networks',
@ -2883,7 +2898,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
quota = 2
cfg.CONF.set_override('quota_network', quota, group='QUOTAS')
_set_temporary_quota('network', quota)
self._tenant_id = uuidutils.generate_uuid()
networks = [{'network': {'name': 'n1',
'tenant_id': self._tenant_id}},
{'network': {'name': 'n2',
@ -2900,7 +2916,8 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk network create")
quota = 2
cfg.CONF.set_override('quota_network', quota, group='QUOTAS')
_set_temporary_quota('network', quota)
self._tenant_id = uuidutils.generate_uuid()
networks = [{'network': {'name': 'n1',
'tenant_id': self._tenant_id}},
{'network': {'name': 'n2',
@ -5120,7 +5137,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
def test_create_subnets_native_quotas(self):
quota = 1
cfg.CONF.set_override('quota_subnet', quota, group='QUOTAS')
_set_temporary_quota('subnet', quota)
self._tenant_id = uuidutils.generate_uuid()
with self.network() as network:
res = self._create_subnet(
self.fmt, network['network']['id'], '10.0.0.0/24',
@ -5135,7 +5153,8 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk subnet create")
quota = 4
cfg.CONF.set_override('quota_subnet', quota, group='QUOTAS')
_set_temporary_quota('subnet', quota)
self._tenant_id = uuidutils.generate_uuid()
with self.network() as network:
res = self._create_subnet_bulk(self.fmt, quota + 1,
network['network']['id'],

View File

@ -83,7 +83,7 @@ class ExtensionTestCase(testlib_api.WebTestCase):
setattr(instance, native_sorting_attr_name, True)
if use_quota:
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
cfg.CONF.set_override('quota_driver', quota.QUOTA_DB_DRIVER,
group='QUOTAS')
setattr(instance, 'path_prefix', resource_prefix)

View File

@ -752,7 +752,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
def test_create_routers_native_quotas(self):
tenant_id = _uuid()
quota = 1
cfg.CONF.set_override('quota_router', quota, group='QUOTAS')
test_db_base_plugin_v2._set_temporary_quota('router', quota)
res = self._create_router(self.fmt, tenant_id)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self._create_router(self.fmt, tenant_id)
@ -3395,7 +3395,8 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
def test_create_floatingips_native_quotas(self):
quota = 1
cfg.CONF.set_override('quota_floatingip', quota, group='QUOTAS')
test_db_base_plugin_v2._set_temporary_quota('floatingip', quota)
self._tenant_id = uuidutils.generate_uuid()
with self.subnet() as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])
res = self._create_floatingip(

View File

@ -80,7 +80,7 @@ class ProvidernetExtensionTestCase(testlib_api.WebTestCase):
self.api = webtest.TestApp(router.APIRouter())
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
cfg.CONF.set_override('quota_driver', quota.QUOTA_DB_DRIVER,
group='QUOTAS')
def _prepare_net_data(self):

View File

@ -88,9 +88,7 @@ class QuotaExtensionDbTestCase(QuotaExtensionTestCase):
def setUp(self):
cfg.CONF.set_override(
'quota_driver',
'neutron.db.quota.driver.DbQuotaDriver',
group='QUOTAS')
'quota_driver', quota.QUOTA_DB_DRIVER, group='QUOTAS')
super(QuotaExtensionDbTestCase, self).setUp()
def test_quotas_loaded_right(self):
@ -423,9 +421,7 @@ class QuotaExtensionCfgTestCase(QuotaExtensionTestCase):
def setUp(self):
cfg.CONF.set_override(
'quota_driver',
'neutron.quota.ConfDriver',
group='QUOTAS')
'quota_driver', quota.QUOTA_DB_DRIVER, group='QUOTAS')
super(QuotaExtensionCfgTestCase, self).setUp()
def test_quotas_default_values(self):
@ -466,7 +462,7 @@ class QuotaExtensionCfgTestCase(QuotaExtensionTestCase):
res = self.api.put(_get_path('quotas', id=tenant_id, fmt=self.fmt),
self.serialize(quotas),
expect_errors=True)
self.assertEqual(403, res.status_int)
self.assertEqual(200, res.status_int)
def test_delete_quotas_forbidden(self):
tenant_id = 'tenant_id1'
@ -518,11 +514,8 @@ class TestQuotaDriverLoad(base.BaseTestCase):
self.assertEqual(loaded_driver, driver.__class__.__name__)
def test_quota_driver_load(self):
for klass in (quota.ConfDriver, driver.DbQuotaDriver,
for klass in (driver.DbQuotaDriver,
driver_nolock.DbQuotaNoLockDriver):
self._test_quota_driver(
'.'.join([klass.__module__, klass.__name__]),
klass.__name__, True)
def test_quota_driver_fallback_conf_driver(self):
self._test_quota_driver(quota.QUOTA_DB_DRIVER, 'ConfDriver', False)

View File

@ -1977,8 +1977,8 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
sgr = self._list('security-group-rules').get(
'security_group_rules')
quota = len(sgr) + 1
cfg.CONF.set_override(
'quota_security_group_rule', quota, group='QUOTAS')
test_db_base_plugin_v2._set_temporary_quota('security_group_rule',
quota)
security_group_id = sg['security_group']['id']
rule = self._build_security_group_rule(

View File

@ -72,7 +72,7 @@ class VlanTransparentExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
ext_mgr=ext_mgr)
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
cfg.CONF.set_override('quota_driver', quota.QUOTA_DB_DRIVER,
group='QUOTAS')
def test_network_create_with_vlan_transparent_attr(self):

View File

@ -16,8 +16,11 @@ from neutron_lib import context
from neutron_lib import fixture
from oslo_utils import uuidutils
from neutron.conf import quota as quota_conf
from neutron.db.quota import api as quota_db_api
from neutron import quota
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.db import test_db_base_plugin_v2
from neutron.tests.unit.extensions import test_l3
from neutron.tests.unit.extensions import test_securitygroup
from neutron.tests.unit.plugins.ml2 import base as ml2_base
@ -38,9 +41,17 @@ class BaseTestTrackedResources(test_plugin.Ml2PluginV2TestCase,
def setUp(self):
self.ctx = context.get_admin_context()
self.addCleanup(self._cleanup)
test_db_base_plugin_v2.NeutronDbPluginV2TestCase.quota_db_driver = (
'neutron.db.quota.driver.DbQuotaDriver')
super(BaseTestTrackedResources, self).setUp()
self._tenant_id = uuidutils.generate_uuid()
@staticmethod
def _cleanup():
test_db_base_plugin_v2.NeutronDbPluginV2TestCase.quota_db_driver = (
quota_conf.QUOTA_DB_DRIVER)
def _test_init(self, resource_name):
quota_db_api.set_quota_usage(
self.ctx, resource_name, self._tenant_id)
@ -167,10 +178,19 @@ class TestL3ResourcesEventHandler(BaseTestEventHandler,
test_l3.L3NatTestCaseMixin):
def setUp(self):
self.addCleanup(self._cleanup)
test_db_base_plugin_v2.NeutronDbPluginV2TestCase.quota_db_driver = (
'neutron.db.quota.driver.DbQuotaDriver')
super(TestL3ResourcesEventHandler, self).setUp()
self.useFixture(fixture.APIDefinitionFixture())
ext_mgr = test_l3.L3TestExtensionManager()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
quota.QUOTAS._driver = None
@staticmethod
def _cleanup():
test_db_base_plugin_v2.NeutronDbPluginV2TestCase.quota_db_driver = (
quota_conf.QUOTA_DB_DRIVER)
def test_create_delete_floating_ip_triggers_event(self):
net = self._make_network('json', 'meh', True)