Add limit provider

This patch adds the registered limit and project limit provider
Class.

Change-Id: I636cd9555ae6434b0e6dec958ae41ef852a48285
bp: unified-limits
This commit is contained in:
wangxiyuan 2017-11-30 17:17:22 +08:00
parent 0cde8da14e
commit 0b241dcea5
12 changed files with 887 additions and 4 deletions

View File

@ -48,7 +48,7 @@ from keystone.conf import signing
from keystone.conf import token
from keystone.conf import tokenless_auth
from keystone.conf import trust
from keystone.conf import unified_limit
CONF = cfg.CONF
@ -83,6 +83,7 @@ conf_modules = [
token,
tokenless_auth,
trust,
unified_limit,
]

View File

@ -0,0 +1,65 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from keystone.conf import utils
driver = cfg.StrOpt(
'driver',
default='sql',
help=utils.fmt("""
Entry point for the unified limit backend driver in the
`keystone.unified_limit` namespace. Keystone only provides a `sql` driver, so
there's no reason to change this unless you are providing a custom entry point.
"""))
caching = cfg.BoolOpt(
'caching',
default=True,
help=utils.fmt("""
Toggle for unified limit caching. This has no effect unless global caching is
enabled. In a typical deployment, there is no reason to disable this.
"""))
cache_time = cfg.IntOpt(
'cache_time',
help=utils.fmt("""
Time to cache unified limit data, in seconds. This has no effect unless both
global caching and `[unified_limit] caching` are enabled.
"""))
list_limit = cfg.IntOpt(
'list_limit',
help=utils.fmt("""
Maximum number of entities that will be returned in a role collection. This may
be useful to tune if you have a large number of unified limits in your
deployment.
"""))
GROUP_NAME = __name__.split('.')[-1]
ALL_OPTS = [
driver,
caching,
cache_time,
list_limit,
]
def register_opts(conf):
conf.register_opts(ALL_OPTS, group=GROUP_NAME)
def list_opts():
return {GROUP_NAME: ALL_OPTS}

View File

@ -0,0 +1,15 @@
# Copyright 2018 Huawei
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.limit.core import * # noqa

View File

@ -18,15 +18,20 @@ import abc
from oslo_log import log
import six
import keystone.conf
from keystone import exception
LOG = log.getLogger(__name__)
CONF = keystone.conf.CONF
@six.add_metaclass(abc.ABCMeta)
class UnifiedLimitDriverBase(object):
def _get_list_limit(self):
return CONF.unified_limit.list_limit or CONF.list_limit
@abc.abstractmethod
def create_registered_limits(self, registered_limits):
"""Create new registered limits.

110
keystone/limit/core.py Normal file
View File

@ -0,0 +1,110 @@
# Copyright 2018 Huawei
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.common import cache
from keystone.common import driver_hints
from keystone.common import manager
from keystone.common import provider_api
import keystone.conf
from keystone import exception
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
MEMOIZE = cache.get_memoization_decorator(group='unified_limit')
class Manager(manager.Manager):
driver_namespace = 'keystone.unified_limit'
_provides_api = 'unified_limit_api'
def __init__(self):
unified_limit_driver = CONF.unified_limit.driver
super(Manager, self).__init__(unified_limit_driver)
def _assert_resource_exist(self, unified_limit, target):
try:
service_id = unified_limit.get('service_id')
if service_id is not None:
PROVIDERS.catalog_api.get_service(service_id)
region_id = unified_limit.get('region_id')
if region_id is not None:
PROVIDERS.catalog_api.get_region(region_id)
project_id = unified_limit.get('project_id')
if project_id is not None:
PROVIDERS.resource_api.get_project(project_id)
except exception.ServiceNotFound:
raise exception.ValidationError(attribute='service_id',
target=target)
except exception.RegionNotFound:
raise exception.ValidationError(attribute='region_id',
target=target)
except exception.ProjectNotFound:
raise exception.ValidationError(attribute='project_id',
target=target)
def create_registered_limits(self, registered_limits):
for registered_limit in registered_limits:
self._assert_resource_exist(registered_limit, 'registered_limit')
self.driver.create_registered_limits(registered_limits)
return self.list_registered_limits()
def update_registered_limits(self, registered_limits):
for registered_limit in registered_limits:
self._assert_resource_exist(registered_limit, 'registered_limit')
self.driver.update_registered_limits(registered_limits)
for registered_limit in registered_limits:
self.get_registered_limit.invalidate(self, registered_limit['id'])
return self.list_registered_limits()
@manager.response_truncated
def list_registered_limits(self, hints=None):
return self.driver.list_registered_limits(
hints or driver_hints.Hints())
@MEMOIZE
def get_registered_limit(self, registered_limit_id):
return self.driver.get_registered_limit(registered_limit_id)
def delete_registered_limit(self, registered_limit_id):
self.driver.delete_registered_limit(registered_limit_id)
self.get_registered_limit.invalidate(self, registered_limit_id)
def create_limits(self, limits):
for limit in limits:
self._assert_resource_exist(limit, 'limit')
self.driver.create_limits(limits)
return self.list_limits()
def update_limits(self, limits):
for limit in limits:
self._assert_resource_exist(limit, 'limit')
self.driver.update_limits(limits)
for limit in limits:
self.get_limit.invalidate(self, limit['id'])
return self.list_limits()
@manager.response_truncated
def list_limits(self, hints=None):
return self.driver.list_limits(hints or driver_hints.Hints())
@MEMOIZE
def get_limit(self, limit_id):
return self.driver.get_limit(limit_id)
def delete_limit(self, limit_id):
self.driver.delete_limit(limit_id)
self.get_limit.invalidate(self, limit_id)

View File

@ -20,6 +20,7 @@ from keystone import credential
from keystone import endpoint_policy
from keystone import federation
from keystone import identity
from keystone import limit
from keystone import oauth1
from keystone import policy
from keystone import resource
@ -45,9 +46,10 @@ def load_backends():
credential.provider.Manager, resource.DomainConfigManager,
endpoint_policy.Manager, federation.Manager,
identity.generator.Manager, identity.MappingManager,
identity.Manager, identity.ShadowUsersManager, oauth1.Manager,
policy.Manager, resource.Manager, revoke.Manager,
assignment.RoleManager, trust.Manager, token.provider.Manager,
identity.Manager, identity.ShadowUsersManager,
limit.Manager, oauth1.Manager, policy.Manager,
resource.Manager, revoke.Manager, assignment.RoleManager,
trust.Manager, token.provider.Manager,
persistence.PersistenceManager]
drivers = {d._provides_api: d() for d in managers}

View File

@ -461,6 +461,29 @@ def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None,
return ref
def new_registered_limit_ref(**kwargs):
ref = {
'service_id': uuid.uuid4().hex,
'resource_name': uuid.uuid4().hex,
'default_limit': 10
}
ref.update(kwargs)
return ref
def new_limit_ref(**kwargs):
ref = {
'project_id': uuid.uuid4().hex,
'service_id': uuid.uuid4().hex,
'resource_name': uuid.uuid4().hex,
'resource_limit': 10
}
ref.update(kwargs)
return ref
def create_user(api, domain_id, **kwargs):
"""Create a user via the API. Keep the created password.

View File

@ -164,3 +164,12 @@ DOMAINS = [{'description':
'enabled': True,
'id': DEFAULT_DOMAIN_ID,
'name': u'Default'}]
SERVICES = [{'id': uuid.uuid4().hex,
'type': 'type_one',
'enabled': True,
'extra': {'description': 'This is a service for test.',
'name': 'service_one'}
}]
REGIONS = [{'id': 'region_one'}, {'id': 'region_two'}]

View File

View File

@ -0,0 +1,596 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystone.common import driver_hints
from keystone.common import provider_api
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import utils as test_utils
PROVIDERS = provider_api.ProviderAPIs
class RegisteredLimitTests(object):
def test_create_registered_limit_crud(self):
# create one, return all registered_limits
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
res1 = PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1])
self.assertDictEqual(registered_limit_1, res1[0])
# create another, return all registered_limits
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
res2 = PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_2])
self.assertEqual(2, len(res2))
for re in res2:
if re['id'] == registered_limit_1['id']:
self.assertDictEqual(registered_limit_1, re)
if re['id'] == registered_limit_2['id']:
self.assertDictEqual(registered_limit_2, re)
def test_create_registered_limit_duplicate(self):
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1])
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
self.assertRaises(exception.Conflict,
PROVIDERS.unified_limit_api.create_registered_limits,
[registered_limit_2])
def test_create_multi_registered_limits_duplicate(self):
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1])
# Create with a duplicated one and a normal one. Both of them will not
# be created.
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
registered_limit_3 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex)
self.assertRaises(exception.Conflict,
PROVIDERS.unified_limit_api.create_registered_limits,
[registered_limit_2, registered_limit_3])
reg_limits = PROVIDERS.unified_limit_api.list_registered_limits()
self.assertEqual(1, len(reg_limits))
self.assertEqual(registered_limit_1['id'], reg_limits[0]['id'])
def test_create_registered_limit_invalid_service(self):
registered_limit_1 = unit.new_registered_limit_ref(
service_id=uuid.uuid4().hex,
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
self.assertRaises(exception.ValidationError,
PROVIDERS.unified_limit_api.create_registered_limits,
[registered_limit_1])
def test_create_registered_limit_invalid_region(self):
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=uuid.uuid4().hex,
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
self.assertRaises(exception.ValidationError,
PROVIDERS.unified_limit_api.create_registered_limits,
[registered_limit_1])
def test_update_registered_limit(self):
# create two registered limits
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1, registered_limit_2])
# update one, return all registered_limits
registered_limit_update = {'id': registered_limit_1['id'],
'region_id': 'region_two'}
res = PROVIDERS.unified_limit_api.update_registered_limits(
[registered_limit_update])
for re in res:
if re['id'] == registered_limit_1['id']:
self.assertEqual('region_two', re['region_id'])
if re['id'] == registered_limit_2['id']:
self.assertDictEqual(registered_limit_2, re)
def test_update_registered_limit_invalid_input_return_bad_request(self):
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1])
update_ref = {'id': registered_limit_1['id'],
'service_id': uuid.uuid4().hex}
self.assertRaises(exception.ValidationError,
PROVIDERS.unified_limit_api.update_registered_limits,
[update_ref])
update_ref = {'id': registered_limit_1['id'],
'region_id': 'fake_id'}
self.assertRaises(exception.ValidationError,
PROVIDERS.unified_limit_api.update_registered_limits,
[update_ref])
def test_update_registered_limit_duplicate(self):
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_two['id'],
resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1, registered_limit_2])
# Update registered_limit1 to registered_limit2
update_ref = {'id': registered_limit_1['id'],
'region_id': self.region_two['id'],
'resource_name': 'snapshot'}
self.assertRaises(exception.Conflict,
PROVIDERS.unified_limit_api.update_registered_limits,
[update_ref])
@test_utils.wip("Skipped until Bug 1744195 is resolved")
def test_update_registered_limit_when_reference_limit_exist(self):
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1])
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_limits([limit_1])
registered_limit_update = {'id': registered_limit_1['id'],
'region_id': 'region_two'}
self.assertRaises(exception.RegisteredLimitError,
PROVIDERS.unified_limit_api.update_registered_limits,
[registered_limit_update])
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_2])
limit_2 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_limits([limit_2])
registered_limit_update = {'id': registered_limit_2['id'],
'region_id': 'region_two'}
self.assertRaises(exception.RegisteredLimitError,
PROVIDERS.unified_limit_api.update_registered_limits,
[registered_limit_update])
def test_list_registered_limits(self):
# create two registered limits
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
reg_limits_1 = PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1, registered_limit_2])
# list
reg_limits_2 = PROVIDERS.unified_limit_api.list_registered_limits()
self.assertEqual(2, len(reg_limits_2))
self.assertDictEqual(reg_limits_1[0], reg_limits_2[0])
self.assertDictEqual(reg_limits_1[1], reg_limits_2[1])
def test_list_registered_limit_by_limit(self):
self.config_fixture.config(list_limit=1)
# create two registered limits
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1, registered_limit_2])
# list, limit is 1
hints = driver_hints.Hints()
reg_limits = PROVIDERS.unified_limit_api.list_registered_limits(
hints=hints)
self.assertEqual(1, len(reg_limits))
if reg_limits[0]['id'] == registered_limit_1['id']:
self.assertDictEqual(registered_limit_1, reg_limits[0])
else:
self.assertDictEqual(registered_limit_2, reg_limits[0])
def test_list_registered_limit_by_filter(self):
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_two['id'],
resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1, registered_limit_2])
hints = driver_hints.Hints()
hints.add_filter('service_id', self.service_one['id'])
res = PROVIDERS.unified_limit_api.list_registered_limits(hints)
self.assertEqual(2, len(res))
hints = driver_hints.Hints()
hints.add_filter('region_id', self.region_one['id'])
res = PROVIDERS.unified_limit_api.list_registered_limits(hints)
self.assertEqual(1, len(res))
hints = driver_hints.Hints()
hints.add_filter('resource_name', 'backup')
res = PROVIDERS.unified_limit_api.list_registered_limits(hints)
self.assertEqual(0, len(res))
def test_get_registered_limit(self):
# create two registered limits
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1, registered_limit_2])
# show one
res = PROVIDERS.unified_limit_api.get_registered_limit(
registered_limit_2['id'])
self.assertDictEqual(registered_limit_2, res)
def test_get_registered_limit_returns_not_found(self):
self.assertRaises(exception.RegisteredLimitNotFound,
PROVIDERS.unified_limit_api.get_registered_limit,
uuid.uuid4().hex)
def test_delete_registered_limit(self):
# create two registered limits
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1, registered_limit_2])
# delete one
PROVIDERS.unified_limit_api.delete_registered_limit(
registered_limit_1['id'])
self.assertRaises(exception.RegisteredLimitNotFound,
PROVIDERS.unified_limit_api.get_registered_limit,
registered_limit_1['id'])
reg_limits = PROVIDERS.unified_limit_api.list_registered_limits()
self.assertEqual(1, len(reg_limits))
self.assertEqual(registered_limit_2['id'], reg_limits[0]['id'])
def test_delete_registered_limit_returns_not_found(self):
self.assertRaises(exception.RegisteredLimitNotFound,
PROVIDERS.unified_limit_api.delete_registered_limit,
uuid.uuid4().hex)
@test_utils.wip("Skipped until Bug 1744195 is resolved")
def test_delete_registered_limit_when_reference_limit_exist(self):
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1])
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_limits([limit_1])
self.assertRaises(exception.RegisteredLimitError,
PROVIDERS.unified_limit_api.delete_registered_limit,
registered_limit_1['id'])
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_2])
limit_2 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_limits([limit_2])
self.assertRaises(exception.RegisteredLimitError,
PROVIDERS.unified_limit_api.delete_registered_limit,
registered_limit_2['id'])
class LimitTests(object):
def test_create_limit(self):
# create one, return all limits
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
res1 = PROVIDERS.unified_limit_api.create_limits([limit_1])
self.assertDictEqual(limit_1, res1[0])
# create another, return all limits
limit_2 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_two['id'],
resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
res2 = PROVIDERS.unified_limit_api.create_limits([limit_2])
for re in res2:
if re['id'] == limit_1['id']:
self.assertDictEqual(limit_1, re)
if re['id'] == limit_2['id']:
self.assertDictEqual(limit_2, re)
def test_create_limit_duplicate(self):
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_limits([limit_1])
# use different id but the same project_id, service_id and region_id
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
self.assertRaises(exception.Conflict,
PROVIDERS.unified_limit_api.create_limits,
[limit_1])
def test_create_limit_with_invalid_service_raises_validation_error(self):
limit = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=uuid.uuid4().hex,
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
self.assertRaises(exception.ValidationError,
PROVIDERS.unified_limit_api.create_limits,
[limit])
def test_create_limit_with_invalid_region_raises_validation_error(self):
limit = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=uuid.uuid4().hex,
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
self.assertRaises(exception.ValidationError,
PROVIDERS.unified_limit_api.create_limits,
[limit])
@test_utils.wip("Skipped until Bug 1744195 is resolved")
def test_create_limit_without_reference_registered_limit(self):
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
self.assertRaises(exception.NoLimitReference,
PROVIDERS.unified_limit_api.create_limits,
[limit_1])
def test_update_limits(self):
# create two limits
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
limit_2 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_two['id'],
resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
# update one, return all limits
limit_update = {'id': limit_1['id'],
'resource_limit': 8}
res = PROVIDERS.unified_limit_api.update_limits([limit_update])
for re in res:
if re['id'] == limit_1['id']:
self.assertEqual(8, re['resource_limit'])
if re['id'] == limit_2['id']:
self.assertDictEqual(limit_2, re)
def test_list_limits(self):
# create two limits
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
limit_2 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_two['id'],
resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
# list
hints = driver_hints.Hints()
hints.add_filter('project_id', self.tenant_bar['id'])
limits = PROVIDERS.unified_limit_api.list_limits(hints)
self.assertEqual(2, len(limits))
for re in limits:
if re['id'] == limit_1['id']:
self.assertDictEqual(limit_1, re)
if re['id'] == limit_2['id']:
self.assertDictEqual(limit_2, re)
def test_list_limit_by_limit(self):
self.config_fixture.config(list_limit=1)
# create two limits
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
limit_2 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_two['id'],
resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
# list, limit is 1
hints = driver_hints.Hints()
limits = PROVIDERS.unified_limit_api.list_limits(hints=hints)
self.assertEqual(1, len(limits))
if limits[0]['id'] == limit_1['id']:
self.assertDictEqual(limit_1, limits[0])
else:
self.assertDictEqual(limit_2, limits[0])
def test_list_limit_by_filter(self):
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
limit_2 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_two['id'],
resource_name='snapshot', resource_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
hints = driver_hints.Hints()
hints.add_filter('service_id', self.service_one['id'])
hints.add_filter('project_id', self.tenant_bar['id'])
res = PROVIDERS.unified_limit_api.list_limits(hints)
self.assertEqual(2, len(res))
hints = driver_hints.Hints()
hints.add_filter('region_id', self.region_one['id'])
hints.add_filter('project_id', self.tenant_bar['id'])
res = PROVIDERS.unified_limit_api.list_limits(hints)
self.assertEqual(1, len(res))
self.assertDictEqual(limit_1, res[0])
hints = driver_hints.Hints()
hints.add_filter('resource_name', 'backup')
hints.add_filter('project_id', self.tenant_bar['id'])
res = PROVIDERS.unified_limit_api.list_limits(hints)
self.assertEqual(0, len(res))
def test_get_limit(self):
# create two limits
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
limit_2 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_two['id'],
resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
# show one
res = PROVIDERS.unified_limit_api.get_limit(limit_2['id'])
self.assertDictEqual(limit_2, res)
def test_get_limit_returns_not_found(self):
self.assertRaises(exception.LimitNotFound,
PROVIDERS.unified_limit_api.get_limit,
uuid.uuid4().hex)
def test_delete_limit(self):
# create two limits
limit_1 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
limit_2 = unit.new_limit_ref(
project_id=self.tenant_bar['id'],
service_id=self.service_one['id'],
region_id=self.region_two['id'],
resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
# delete one
PROVIDERS.unified_limit_api.delete_limit(limit_1['id'])
# delete again
self.assertRaises(exception.LimitNotFound,
PROVIDERS.unified_limit_api.get_limit,
limit_1['id'])
def test_delete_limit_returns_not_found(self):
self.assertRaises(exception.LimitNotFound,
PROVIDERS.unified_limit_api.delete_limit,
uuid.uuid4().hex)

View File

@ -26,6 +26,7 @@ from sqlalchemy import exc
from testtools import matchers
from keystone.common import driver_hints
from keystone.common import provider_api
from keystone.common import sql
import keystone.conf
from keystone.credential.providers import fernet as credential_provider
@ -39,6 +40,7 @@ from keystone.tests.unit import default_fixtures
from keystone.tests.unit.identity import test_backends as identity_tests
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit.limit import test_backends as limit_tests
from keystone.tests.unit.policy import test_backends as policy_tests
from keystone.tests.unit.resource import test_backends as resource_tests
from keystone.tests.unit.token import test_backends as token_tests
@ -48,6 +50,7 @@ from keystone.trust.backends import sql as trust_sql
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class SqlTests(unit.SQLDriverOverrides, unit.TestCase):
@ -1260,3 +1263,54 @@ class SqlCredential(SqlTests):
# Make sure the `blob` values listed from the API are not encrypted.
listed_credentials = self.credential_api.list_credentials()
self.assertIn(created_credential, listed_credentials)
class SqlRegisteredLimit(SqlTests, limit_tests.RegisteredLimitTests):
def setUp(self):
super(SqlRegisteredLimit, self).setUp()
fixtures_to_cleanup = []
for service in default_fixtures.SERVICES:
service_id = service['id']
rv = PROVIDERS.catalog_api.create_service(service_id, service)
attrname = service['extra']['name']
setattr(self, attrname, rv)
fixtures_to_cleanup.append(attrname)
for region in default_fixtures.REGIONS:
rv = PROVIDERS.catalog_api.create_region(region)
attrname = region['id']
setattr(self, attrname, rv)
fixtures_to_cleanup.append(attrname)
self.addCleanup(self.cleanup_instance(*fixtures_to_cleanup))
class SqlLimit(SqlTests, limit_tests.LimitTests):
def setUp(self):
super(SqlLimit, self).setUp()
fixtures_to_cleanup = []
for service in default_fixtures.SERVICES:
service_id = service['id']
rv = PROVIDERS.catalog_api.create_service(service_id, service)
attrname = service['extra']['name']
setattr(self, attrname, rv)
fixtures_to_cleanup.append(attrname)
for region in default_fixtures.REGIONS:
rv = PROVIDERS.catalog_api.create_region(region)
attrname = region['id']
setattr(self, attrname, rv)
fixtures_to_cleanup.append(attrname)
self.addCleanup(self.cleanup_instance(*fixtures_to_cleanup))
registered_limit_1 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_one['id'],
resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
registered_limit_2 = unit.new_registered_limit_ref(
service_id=self.service_one['id'],
region_id=self.region_two['id'],
resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex)
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit_1, registered_limit_2])

View File

@ -159,6 +159,9 @@ keystone.token.provider =
keystone.trust =
sql = keystone.trust.backends.sql:Trust
keystone.unified_limit =
sql = keystone.limit.backends.sql:UnifiedLimit
keystone.endpoint_filter =
sql = keystone.catalog.backends.sql:Catalog