From 0b241dcea555ac559e2f9eb850b756cbdf8e77ca Mon Sep 17 00:00:00 2001
From: wangxiyuan <wangxiyuan@huawei.com>
Date: Thu, 30 Nov 2017 17:17:22 +0800
Subject: [PATCH] Add limit provider

This patch adds the registered limit and project limit provider
Class.

Change-Id: I636cd9555ae6434b0e6dec958ae41ef852a48285
bp: unified-limits
---
 keystone/conf/__init__.py                  |   3 +-
 keystone/conf/unified_limit.py             |  65 +++
 keystone/limit/__init__.py                 |  15 +
 keystone/limit/backends/base.py            |   5 +
 keystone/limit/core.py                     | 110 ++++
 keystone/server/backends.py                |   8 +-
 keystone/tests/unit/core.py                |  23 +
 keystone/tests/unit/default_fixtures.py    |   9 +
 keystone/tests/unit/limit/__init__.py      |   0
 keystone/tests/unit/limit/test_backends.py | 596 +++++++++++++++++++++
 keystone/tests/unit/test_backend_sql.py    |  54 ++
 setup.cfg                                  |   3 +
 12 files changed, 887 insertions(+), 4 deletions(-)
 create mode 100644 keystone/conf/unified_limit.py
 create mode 100644 keystone/limit/core.py
 create mode 100644 keystone/tests/unit/limit/__init__.py
 create mode 100644 keystone/tests/unit/limit/test_backends.py

diff --git a/keystone/conf/__init__.py b/keystone/conf/__init__.py
index 48e68f8092..1288a5f409 100644
--- a/keystone/conf/__init__.py
+++ b/keystone/conf/__init__.py
@@ -48,7 +48,7 @@ from keystone.conf import signing
 from keystone.conf import token
 from keystone.conf import tokenless_auth
 from keystone.conf import trust
-
+from keystone.conf import unified_limit
 
 CONF = cfg.CONF
 
@@ -83,6 +83,7 @@ conf_modules = [
     token,
     tokenless_auth,
     trust,
+    unified_limit,
 ]
 
 
diff --git a/keystone/conf/unified_limit.py b/keystone/conf/unified_limit.py
new file mode 100644
index 0000000000..1783e83d48
--- /dev/null
+++ b/keystone/conf/unified_limit.py
@@ -0,0 +1,65 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_config import cfg
+
+from keystone.conf import utils
+
+
+driver = cfg.StrOpt(
+    'driver',
+    default='sql',
+    help=utils.fmt("""
+Entry point for the unified limit backend driver in the
+`keystone.unified_limit` namespace. Keystone only provides a `sql` driver, so
+there's no reason to change this unless you are providing a custom entry point.
+"""))
+
+caching = cfg.BoolOpt(
+    'caching',
+    default=True,
+    help=utils.fmt("""
+Toggle for unified limit caching. This has no effect unless global caching is
+enabled. In a typical deployment, there is no reason to disable this.
+"""))
+
+cache_time = cfg.IntOpt(
+    'cache_time',
+    help=utils.fmt("""
+Time to cache unified limit data, in seconds. This has no effect unless both
+global caching and `[unified_limit] caching` are enabled.
+"""))
+
+list_limit = cfg.IntOpt(
+    'list_limit',
+    help=utils.fmt("""
+Maximum number of entities that will be returned in a role collection. This may
+be useful to tune if you have a large number of unified limits in your
+deployment.
+"""))
+
+
+GROUP_NAME = __name__.split('.')[-1]
+ALL_OPTS = [
+    driver,
+    caching,
+    cache_time,
+    list_limit,
+]
+
+
+def register_opts(conf):
+    conf.register_opts(ALL_OPTS, group=GROUP_NAME)
+
+
+def list_opts():
+    return {GROUP_NAME: ALL_OPTS}
diff --git a/keystone/limit/__init__.py b/keystone/limit/__init__.py
index e69de29bb2..0595e2c229 100644
--- a/keystone/limit/__init__.py
+++ b/keystone/limit/__init__.py
@@ -0,0 +1,15 @@
+# Copyright 2018 Huawei
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from keystone.limit.core import *  # noqa
diff --git a/keystone/limit/backends/base.py b/keystone/limit/backends/base.py
index 54444ffce0..f20b974fe8 100644
--- a/keystone/limit/backends/base.py
+++ b/keystone/limit/backends/base.py
@@ -18,15 +18,20 @@ import abc
 from oslo_log import log
 import six
 
+import keystone.conf
 from keystone import exception
 
 
 LOG = log.getLogger(__name__)
+CONF = keystone.conf.CONF
 
 
 @six.add_metaclass(abc.ABCMeta)
 class UnifiedLimitDriverBase(object):
 
+    def _get_list_limit(self):
+        return CONF.unified_limit.list_limit or CONF.list_limit
+
     @abc.abstractmethod
     def create_registered_limits(self, registered_limits):
         """Create new registered limits.
diff --git a/keystone/limit/core.py b/keystone/limit/core.py
new file mode 100644
index 0000000000..c8d8a8496a
--- /dev/null
+++ b/keystone/limit/core.py
@@ -0,0 +1,110 @@
+# Copyright 2018 Huawei
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from keystone.common import cache
+from keystone.common import driver_hints
+from keystone.common import manager
+from keystone.common import provider_api
+import keystone.conf
+from keystone import exception
+
+
+CONF = keystone.conf.CONF
+PROVIDERS = provider_api.ProviderAPIs
+
+MEMOIZE = cache.get_memoization_decorator(group='unified_limit')
+
+
+class Manager(manager.Manager):
+
+    driver_namespace = 'keystone.unified_limit'
+    _provides_api = 'unified_limit_api'
+
+    def __init__(self):
+        unified_limit_driver = CONF.unified_limit.driver
+        super(Manager, self).__init__(unified_limit_driver)
+
+    def _assert_resource_exist(self, unified_limit, target):
+        try:
+            service_id = unified_limit.get('service_id')
+            if service_id is not None:
+                PROVIDERS.catalog_api.get_service(service_id)
+            region_id = unified_limit.get('region_id')
+            if region_id is not None:
+                PROVIDERS.catalog_api.get_region(region_id)
+            project_id = unified_limit.get('project_id')
+            if project_id is not None:
+                PROVIDERS.resource_api.get_project(project_id)
+        except exception.ServiceNotFound:
+            raise exception.ValidationError(attribute='service_id',
+                                            target=target)
+        except exception.RegionNotFound:
+            raise exception.ValidationError(attribute='region_id',
+                                            target=target)
+        except exception.ProjectNotFound:
+            raise exception.ValidationError(attribute='project_id',
+                                            target=target)
+
+    def create_registered_limits(self, registered_limits):
+        for registered_limit in registered_limits:
+            self._assert_resource_exist(registered_limit, 'registered_limit')
+        self.driver.create_registered_limits(registered_limits)
+        return self.list_registered_limits()
+
+    def update_registered_limits(self, registered_limits):
+        for registered_limit in registered_limits:
+            self._assert_resource_exist(registered_limit, 'registered_limit')
+        self.driver.update_registered_limits(registered_limits)
+        for registered_limit in registered_limits:
+            self.get_registered_limit.invalidate(self, registered_limit['id'])
+        return self.list_registered_limits()
+
+    @manager.response_truncated
+    def list_registered_limits(self, hints=None):
+        return self.driver.list_registered_limits(
+            hints or driver_hints.Hints())
+
+    @MEMOIZE
+    def get_registered_limit(self, registered_limit_id):
+        return self.driver.get_registered_limit(registered_limit_id)
+
+    def delete_registered_limit(self, registered_limit_id):
+        self.driver.delete_registered_limit(registered_limit_id)
+        self.get_registered_limit.invalidate(self, registered_limit_id)
+
+    def create_limits(self, limits):
+        for limit in limits:
+            self._assert_resource_exist(limit, 'limit')
+        self.driver.create_limits(limits)
+        return self.list_limits()
+
+    def update_limits(self, limits):
+        for limit in limits:
+            self._assert_resource_exist(limit, 'limit')
+        self.driver.update_limits(limits)
+        for limit in limits:
+            self.get_limit.invalidate(self, limit['id'])
+        return self.list_limits()
+
+    @manager.response_truncated
+    def list_limits(self, hints=None):
+        return self.driver.list_limits(hints or driver_hints.Hints())
+
+    @MEMOIZE
+    def get_limit(self, limit_id):
+        return self.driver.get_limit(limit_id)
+
+    def delete_limit(self, limit_id):
+        self.driver.delete_limit(limit_id)
+        self.get_limit.invalidate(self, limit_id)
diff --git a/keystone/server/backends.py b/keystone/server/backends.py
index 797a052583..16dc267a60 100644
--- a/keystone/server/backends.py
+++ b/keystone/server/backends.py
@@ -20,6 +20,7 @@ from keystone import credential
 from keystone import endpoint_policy
 from keystone import federation
 from keystone import identity
+from keystone import limit
 from keystone import oauth1
 from keystone import policy
 from keystone import resource
@@ -45,9 +46,10 @@ def load_backends():
                 credential.provider.Manager, resource.DomainConfigManager,
                 endpoint_policy.Manager, federation.Manager,
                 identity.generator.Manager, identity.MappingManager,
-                identity.Manager, identity.ShadowUsersManager, oauth1.Manager,
-                policy.Manager, resource.Manager, revoke.Manager,
-                assignment.RoleManager, trust.Manager, token.provider.Manager,
+                identity.Manager, identity.ShadowUsersManager,
+                limit.Manager, oauth1.Manager, policy.Manager,
+                resource.Manager, revoke.Manager, assignment.RoleManager,
+                trust.Manager, token.provider.Manager,
                 persistence.PersistenceManager]
 
     drivers = {d._provides_api: d() for d in managers}
diff --git a/keystone/tests/unit/core.py b/keystone/tests/unit/core.py
index 5e3e69fa12..a7c53bd523 100644
--- a/keystone/tests/unit/core.py
+++ b/keystone/tests/unit/core.py
@@ -461,6 +461,29 @@ def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None,
     return ref
 
 
+def new_registered_limit_ref(**kwargs):
+    ref = {
+        'service_id': uuid.uuid4().hex,
+        'resource_name': uuid.uuid4().hex,
+        'default_limit': 10
+    }
+
+    ref.update(kwargs)
+    return ref
+
+
+def new_limit_ref(**kwargs):
+    ref = {
+        'project_id': uuid.uuid4().hex,
+        'service_id': uuid.uuid4().hex,
+        'resource_name': uuid.uuid4().hex,
+        'resource_limit': 10
+    }
+
+    ref.update(kwargs)
+    return ref
+
+
 def create_user(api, domain_id, **kwargs):
     """Create a user via the API. Keep the created password.
 
diff --git a/keystone/tests/unit/default_fixtures.py b/keystone/tests/unit/default_fixtures.py
index f3cf5b630d..d37a06ca0c 100644
--- a/keystone/tests/unit/default_fixtures.py
+++ b/keystone/tests/unit/default_fixtures.py
@@ -164,3 +164,12 @@ DOMAINS = [{'description':
             'enabled': True,
             'id': DEFAULT_DOMAIN_ID,
             'name': u'Default'}]
+
+SERVICES = [{'id': uuid.uuid4().hex,
+             'type': 'type_one',
+             'enabled': True,
+             'extra': {'description': 'This is a service for test.',
+                       'name': 'service_one'}
+             }]
+
+REGIONS = [{'id': 'region_one'}, {'id': 'region_two'}]
diff --git a/keystone/tests/unit/limit/__init__.py b/keystone/tests/unit/limit/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/keystone/tests/unit/limit/test_backends.py b/keystone/tests/unit/limit/test_backends.py
new file mode 100644
index 0000000000..315771ee0d
--- /dev/null
+++ b/keystone/tests/unit/limit/test_backends.py
@@ -0,0 +1,596 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from keystone.common import driver_hints
+from keystone.common import provider_api
+from keystone import exception
+from keystone.tests import unit
+from keystone.tests.unit import utils as test_utils
+
+PROVIDERS = provider_api.ProviderAPIs
+
+
+class RegisteredLimitTests(object):
+
+    def test_create_registered_limit_crud(self):
+        # create one, return all registered_limits
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        res1 = PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1])
+        self.assertDictEqual(registered_limit_1, res1[0])
+
+        # create another, return all registered_limits
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
+        res2 = PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_2])
+        self.assertEqual(2, len(res2))
+        for re in res2:
+            if re['id'] == registered_limit_1['id']:
+                self.assertDictEqual(registered_limit_1, re)
+            if re['id'] == registered_limit_2['id']:
+                self.assertDictEqual(registered_limit_2, re)
+
+    def test_create_registered_limit_duplicate(self):
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1])
+
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        self.assertRaises(exception.Conflict,
+                          PROVIDERS.unified_limit_api.create_registered_limits,
+                          [registered_limit_2])
+
+    def test_create_multi_registered_limits_duplicate(self):
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1])
+
+        # Create with a duplicated one and a normal one. Both of them will not
+        # be created.
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        registered_limit_3 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex)
+        self.assertRaises(exception.Conflict,
+                          PROVIDERS.unified_limit_api.create_registered_limits,
+                          [registered_limit_2, registered_limit_3])
+
+        reg_limits = PROVIDERS.unified_limit_api.list_registered_limits()
+        self.assertEqual(1, len(reg_limits))
+        self.assertEqual(registered_limit_1['id'], reg_limits[0]['id'])
+
+    def test_create_registered_limit_invalid_service(self):
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=uuid.uuid4().hex,
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        self.assertRaises(exception.ValidationError,
+                          PROVIDERS.unified_limit_api.create_registered_limits,
+                          [registered_limit_1])
+
+    def test_create_registered_limit_invalid_region(self):
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=uuid.uuid4().hex,
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        self.assertRaises(exception.ValidationError,
+                          PROVIDERS.unified_limit_api.create_registered_limits,
+                          [registered_limit_1])
+
+    def test_update_registered_limit(self):
+        # create two registered limits
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1, registered_limit_2])
+
+        # update one, return all registered_limits
+        registered_limit_update = {'id': registered_limit_1['id'],
+                                   'region_id': 'region_two'}
+        res = PROVIDERS.unified_limit_api.update_registered_limits(
+            [registered_limit_update])
+        for re in res:
+            if re['id'] == registered_limit_1['id']:
+                self.assertEqual('region_two', re['region_id'])
+            if re['id'] == registered_limit_2['id']:
+                self.assertDictEqual(registered_limit_2, re)
+
+    def test_update_registered_limit_invalid_input_return_bad_request(self):
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1])
+
+        update_ref = {'id': registered_limit_1['id'],
+                      'service_id': uuid.uuid4().hex}
+        self.assertRaises(exception.ValidationError,
+                          PROVIDERS.unified_limit_api.update_registered_limits,
+                          [update_ref])
+
+        update_ref = {'id': registered_limit_1['id'],
+                      'region_id': 'fake_id'}
+        self.assertRaises(exception.ValidationError,
+                          PROVIDERS.unified_limit_api.update_registered_limits,
+                          [update_ref])
+
+    def test_update_registered_limit_duplicate(self):
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_two['id'],
+            resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1, registered_limit_2])
+
+        # Update registered_limit1 to registered_limit2
+        update_ref = {'id': registered_limit_1['id'],
+                      'region_id': self.region_two['id'],
+                      'resource_name': 'snapshot'}
+        self.assertRaises(exception.Conflict,
+                          PROVIDERS.unified_limit_api.update_registered_limits,
+                          [update_ref])
+
+    @test_utils.wip("Skipped until Bug 1744195 is resolved")
+    def test_update_registered_limit_when_reference_limit_exist(self):
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1])
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_limits([limit_1])
+
+        registered_limit_update = {'id': registered_limit_1['id'],
+                                   'region_id': 'region_two'}
+
+        self.assertRaises(exception.RegisteredLimitError,
+                          PROVIDERS.unified_limit_api.update_registered_limits,
+                          [registered_limit_update])
+
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_2])
+        limit_2 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_limits([limit_2])
+
+        registered_limit_update = {'id': registered_limit_2['id'],
+                                   'region_id': 'region_two'}
+
+        self.assertRaises(exception.RegisteredLimitError,
+                          PROVIDERS.unified_limit_api.update_registered_limits,
+                          [registered_limit_update])
+
+    def test_list_registered_limits(self):
+        # create two registered limits
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
+        reg_limits_1 = PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1, registered_limit_2])
+
+        # list
+        reg_limits_2 = PROVIDERS.unified_limit_api.list_registered_limits()
+        self.assertEqual(2, len(reg_limits_2))
+        self.assertDictEqual(reg_limits_1[0], reg_limits_2[0])
+        self.assertDictEqual(reg_limits_1[1], reg_limits_2[1])
+
+    def test_list_registered_limit_by_limit(self):
+        self.config_fixture.config(list_limit=1)
+        # create two registered limits
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1, registered_limit_2])
+
+        # list, limit is 1
+        hints = driver_hints.Hints()
+        reg_limits = PROVIDERS.unified_limit_api.list_registered_limits(
+            hints=hints)
+        self.assertEqual(1, len(reg_limits))
+
+        if reg_limits[0]['id'] == registered_limit_1['id']:
+            self.assertDictEqual(registered_limit_1, reg_limits[0])
+        else:
+            self.assertDictEqual(registered_limit_2, reg_limits[0])
+
+    def test_list_registered_limit_by_filter(self):
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_two['id'],
+            resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1, registered_limit_2])
+
+        hints = driver_hints.Hints()
+        hints.add_filter('service_id', self.service_one['id'])
+        res = PROVIDERS.unified_limit_api.list_registered_limits(hints)
+        self.assertEqual(2, len(res))
+
+        hints = driver_hints.Hints()
+        hints.add_filter('region_id', self.region_one['id'])
+        res = PROVIDERS.unified_limit_api.list_registered_limits(hints)
+        self.assertEqual(1, len(res))
+
+        hints = driver_hints.Hints()
+        hints.add_filter('resource_name', 'backup')
+        res = PROVIDERS.unified_limit_api.list_registered_limits(hints)
+        self.assertEqual(0, len(res))
+
+    def test_get_registered_limit(self):
+        # create two registered limits
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1, registered_limit_2])
+
+        # show one
+        res = PROVIDERS.unified_limit_api.get_registered_limit(
+            registered_limit_2['id'])
+        self.assertDictEqual(registered_limit_2, res)
+
+    def test_get_registered_limit_returns_not_found(self):
+        self.assertRaises(exception.RegisteredLimitNotFound,
+                          PROVIDERS.unified_limit_api.get_registered_limit,
+                          uuid.uuid4().hex)
+
+    def test_delete_registered_limit(self):
+        # create two registered limits
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='snapshot', default_limit=5, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1, registered_limit_2])
+
+        # delete one
+        PROVIDERS.unified_limit_api.delete_registered_limit(
+            registered_limit_1['id'])
+        self.assertRaises(exception.RegisteredLimitNotFound,
+                          PROVIDERS.unified_limit_api.get_registered_limit,
+                          registered_limit_1['id'])
+        reg_limits = PROVIDERS.unified_limit_api.list_registered_limits()
+        self.assertEqual(1, len(reg_limits))
+        self.assertEqual(registered_limit_2['id'], reg_limits[0]['id'])
+
+    def test_delete_registered_limit_returns_not_found(self):
+        self.assertRaises(exception.RegisteredLimitNotFound,
+                          PROVIDERS.unified_limit_api.delete_registered_limit,
+                          uuid.uuid4().hex)
+
+    @test_utils.wip("Skipped until Bug 1744195 is resolved")
+    def test_delete_registered_limit_when_reference_limit_exist(self):
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1])
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_limits([limit_1])
+
+        self.assertRaises(exception.RegisteredLimitError,
+                          PROVIDERS.unified_limit_api.delete_registered_limit,
+                          registered_limit_1['id'])
+
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_2])
+        limit_2 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_limits([limit_2])
+
+        self.assertRaises(exception.RegisteredLimitError,
+                          PROVIDERS.unified_limit_api.delete_registered_limit,
+                          registered_limit_2['id'])
+
+
+class LimitTests(object):
+
+    def test_create_limit(self):
+        # create one, return all limits
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        res1 = PROVIDERS.unified_limit_api.create_limits([limit_1])
+        self.assertDictEqual(limit_1, res1[0])
+
+        # create another, return all limits
+        limit_2 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_two['id'],
+            resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
+        res2 = PROVIDERS.unified_limit_api.create_limits([limit_2])
+        for re in res2:
+            if re['id'] == limit_1['id']:
+                self.assertDictEqual(limit_1, re)
+            if re['id'] == limit_2['id']:
+                self.assertDictEqual(limit_2, re)
+
+    def test_create_limit_duplicate(self):
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_limits([limit_1])
+
+        # use different id but the same project_id, service_id and region_id
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        self.assertRaises(exception.Conflict,
+                          PROVIDERS.unified_limit_api.create_limits,
+                          [limit_1])
+
+    def test_create_limit_with_invalid_service_raises_validation_error(self):
+        limit = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=uuid.uuid4().hex,
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        self.assertRaises(exception.ValidationError,
+                          PROVIDERS.unified_limit_api.create_limits,
+                          [limit])
+
+    def test_create_limit_with_invalid_region_raises_validation_error(self):
+        limit = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=uuid.uuid4().hex,
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        self.assertRaises(exception.ValidationError,
+                          PROVIDERS.unified_limit_api.create_limits,
+                          [limit])
+
+    @test_utils.wip("Skipped until Bug 1744195 is resolved")
+    def test_create_limit_without_reference_registered_limit(self):
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        self.assertRaises(exception.NoLimitReference,
+                          PROVIDERS.unified_limit_api.create_limits,
+                          [limit_1])
+
+    def test_update_limits(self):
+        # create two limits
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        limit_2 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_two['id'],
+            resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
+
+        # update one, return all limits
+        limit_update = {'id': limit_1['id'],
+                        'resource_limit': 8}
+        res = PROVIDERS.unified_limit_api.update_limits([limit_update])
+        for re in res:
+            if re['id'] == limit_1['id']:
+                self.assertEqual(8, re['resource_limit'])
+            if re['id'] == limit_2['id']:
+                self.assertDictEqual(limit_2, re)
+
+    def test_list_limits(self):
+        # create two limits
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        limit_2 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_two['id'],
+            resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
+
+        # list
+        hints = driver_hints.Hints()
+        hints.add_filter('project_id', self.tenant_bar['id'])
+        limits = PROVIDERS.unified_limit_api.list_limits(hints)
+        self.assertEqual(2, len(limits))
+        for re in limits:
+            if re['id'] == limit_1['id']:
+                self.assertDictEqual(limit_1, re)
+            if re['id'] == limit_2['id']:
+                self.assertDictEqual(limit_2, re)
+
+    def test_list_limit_by_limit(self):
+        self.config_fixture.config(list_limit=1)
+        # create two limits
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        limit_2 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_two['id'],
+            resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
+
+        # list, limit is 1
+        hints = driver_hints.Hints()
+        limits = PROVIDERS.unified_limit_api.list_limits(hints=hints)
+        self.assertEqual(1, len(limits))
+        if limits[0]['id'] == limit_1['id']:
+            self.assertDictEqual(limit_1, limits[0])
+        else:
+            self.assertDictEqual(limit_2, limits[0])
+
+    def test_list_limit_by_filter(self):
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        limit_2 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_two['id'],
+            resource_name='snapshot', resource_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
+
+        hints = driver_hints.Hints()
+        hints.add_filter('service_id', self.service_one['id'])
+        hints.add_filter('project_id', self.tenant_bar['id'])
+        res = PROVIDERS.unified_limit_api.list_limits(hints)
+        self.assertEqual(2, len(res))
+
+        hints = driver_hints.Hints()
+        hints.add_filter('region_id', self.region_one['id'])
+        hints.add_filter('project_id', self.tenant_bar['id'])
+        res = PROVIDERS.unified_limit_api.list_limits(hints)
+        self.assertEqual(1, len(res))
+        self.assertDictEqual(limit_1, res[0])
+
+        hints = driver_hints.Hints()
+        hints.add_filter('resource_name', 'backup')
+        hints.add_filter('project_id', self.tenant_bar['id'])
+        res = PROVIDERS.unified_limit_api.list_limits(hints)
+        self.assertEqual(0, len(res))
+
+    def test_get_limit(self):
+        # create two limits
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        limit_2 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_two['id'],
+            resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
+
+        # show one
+        res = PROVIDERS.unified_limit_api.get_limit(limit_2['id'])
+        self.assertDictEqual(limit_2, res)
+
+    def test_get_limit_returns_not_found(self):
+        self.assertRaises(exception.LimitNotFound,
+                          PROVIDERS.unified_limit_api.get_limit,
+                          uuid.uuid4().hex)
+
+    def test_delete_limit(self):
+        # create two limits
+        limit_1 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', resource_limit=10, id=uuid.uuid4().hex)
+        limit_2 = unit.new_limit_ref(
+            project_id=self.tenant_bar['id'],
+            service_id=self.service_one['id'],
+            region_id=self.region_two['id'],
+            resource_name='snapshot', resource_limit=5, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_limits([limit_1, limit_2])
+        # delete one
+        PROVIDERS.unified_limit_api.delete_limit(limit_1['id'])
+        # delete again
+        self.assertRaises(exception.LimitNotFound,
+                          PROVIDERS.unified_limit_api.get_limit,
+                          limit_1['id'])
+
+    def test_delete_limit_returns_not_found(self):
+        self.assertRaises(exception.LimitNotFound,
+                          PROVIDERS.unified_limit_api.delete_limit,
+                          uuid.uuid4().hex)
diff --git a/keystone/tests/unit/test_backend_sql.py b/keystone/tests/unit/test_backend_sql.py
index fc9806af10..735c04871d 100644
--- a/keystone/tests/unit/test_backend_sql.py
+++ b/keystone/tests/unit/test_backend_sql.py
@@ -26,6 +26,7 @@ from sqlalchemy import exc
 from testtools import matchers
 
 from keystone.common import driver_hints
+from keystone.common import provider_api
 from keystone.common import sql
 import keystone.conf
 from keystone.credential.providers import fernet as credential_provider
@@ -39,6 +40,7 @@ from keystone.tests.unit import default_fixtures
 from keystone.tests.unit.identity import test_backends as identity_tests
 from keystone.tests.unit import ksfixtures
 from keystone.tests.unit.ksfixtures import database
+from keystone.tests.unit.limit import test_backends as limit_tests
 from keystone.tests.unit.policy import test_backends as policy_tests
 from keystone.tests.unit.resource import test_backends as resource_tests
 from keystone.tests.unit.token import test_backends as token_tests
@@ -48,6 +50,7 @@ from keystone.trust.backends import sql as trust_sql
 
 
 CONF = keystone.conf.CONF
+PROVIDERS = provider_api.ProviderAPIs
 
 
 class SqlTests(unit.SQLDriverOverrides, unit.TestCase):
@@ -1260,3 +1263,54 @@ class SqlCredential(SqlTests):
         # Make sure the `blob` values listed from the API are not encrypted.
         listed_credentials = self.credential_api.list_credentials()
         self.assertIn(created_credential, listed_credentials)
+
+
+class SqlRegisteredLimit(SqlTests, limit_tests.RegisteredLimitTests):
+
+    def setUp(self):
+        super(SqlRegisteredLimit, self).setUp()
+
+        fixtures_to_cleanup = []
+        for service in default_fixtures.SERVICES:
+            service_id = service['id']
+            rv = PROVIDERS.catalog_api.create_service(service_id, service)
+            attrname = service['extra']['name']
+            setattr(self, attrname, rv)
+            fixtures_to_cleanup.append(attrname)
+        for region in default_fixtures.REGIONS:
+            rv = PROVIDERS.catalog_api.create_region(region)
+            attrname = region['id']
+            setattr(self, attrname, rv)
+            fixtures_to_cleanup.append(attrname)
+        self.addCleanup(self.cleanup_instance(*fixtures_to_cleanup))
+
+
+class SqlLimit(SqlTests, limit_tests.LimitTests):
+
+    def setUp(self):
+        super(SqlLimit, self).setUp()
+
+        fixtures_to_cleanup = []
+        for service in default_fixtures.SERVICES:
+            service_id = service['id']
+            rv = PROVIDERS.catalog_api.create_service(service_id, service)
+            attrname = service['extra']['name']
+            setattr(self, attrname, rv)
+            fixtures_to_cleanup.append(attrname)
+        for region in default_fixtures.REGIONS:
+            rv = PROVIDERS.catalog_api.create_region(region)
+            attrname = region['id']
+            setattr(self, attrname, rv)
+            fixtures_to_cleanup.append(attrname)
+        self.addCleanup(self.cleanup_instance(*fixtures_to_cleanup))
+
+        registered_limit_1 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_one['id'],
+            resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
+        registered_limit_2 = unit.new_registered_limit_ref(
+            service_id=self.service_one['id'],
+            region_id=self.region_two['id'],
+            resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex)
+        PROVIDERS.unified_limit_api.create_registered_limits(
+            [registered_limit_1, registered_limit_2])
diff --git a/setup.cfg b/setup.cfg
index 9fd9ea281e..81e3d82c24 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -159,6 +159,9 @@ keystone.token.provider =
 keystone.trust =
     sql = keystone.trust.backends.sql:Trust
 
+keystone.unified_limit =
+    sql = keystone.limit.backends.sql:UnifiedLimit
+
 keystone.endpoint_filter =
     sql = keystone.catalog.backends.sql:Catalog