diff --git a/openstack/tests/functional/base.py b/openstack/tests/functional/base.py index a3e948e6d..f42f719ff 100644 --- a/openstack/tests/functional/base.py +++ b/openstack/tests/functional/base.py @@ -50,6 +50,16 @@ class BaseFunctionalTest(base.TestCase): def setUp(self): super().setUp() + self._system_admin_name = os.environ.get( + 'OPENSTACKSDK_SYSTEM_ADMIN_CLOUD', + 'devstack-system-admin', + ) + if not self._system_admin_name: + raise self.failureException( + "OPENSTACKSDK_SYSTEM_ADMIN_CLOUD must be set to a non-empty " + "value" + ) + self.config = openstack.config.OpenStackConfig() self._user_cloud_name = os.environ.get( @@ -112,6 +122,14 @@ class BaseFunctionalTest(base.TestCase): self.operator_cloud = connection.Connection(config=operator_config) _disable_keep_alive(self.operator_cloud) + system_admin_config = self.config.get_one( + cloud=self._system_admin_name, **kwargs + ) + self.system_admin_cloud = connection.Connection( + config=system_admin_config + ) + _disable_keep_alive(self.system_admin_cloud) + def _pick_flavor(self): """Pick a sensible flavor to run tests with. diff --git a/openstack/tests/functional/identity/v3/test_limit.py b/openstack/tests/functional/identity/v3/test_limit.py new file mode 100644 index 000000000..b49884fbc --- /dev/null +++ b/openstack/tests/functional/identity/v3/test_limit.py @@ -0,0 +1,98 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from openstack.identity.v3 import limit as _limit +from openstack.tests.functional import base + + +class TestLimit(base.BaseFunctionalTest): + def setUp(self): + super().setUp() + + self.service_name = self.getUniqueString('service') + self.service_type = self.getUniqueString('type') + self.service = self.system_admin_cloud.identity.create_service( + name=self.service_name, + type=self.service_type, + ) + self.addCleanup( + self.system_admin_cloud.identity.delete_service, self.service + ) + + self.resource_name = self.getUniqueString('resource') + self.registered_limit = ( + self.system_admin_cloud.identity.create_registered_limit( + resource_name=self.resource_name, + service_id=self.service.id, + default_limit=10, + ) + ) + self.addCleanup( + self.system_admin_cloud.identity.delete_registered_limit, + self.registered_limit, + ) + + self.project_name = self.getUniqueString('project') + self.project = self.system_admin_cloud.identity.create_project( + name=self.project_name, + ) + self.addCleanup( + self.system_admin_cloud.identity.delete_project, self.project + ) + + self.limit_description = self.getUniqueString('limit') + + def _delete_limit(self, limit): + ret = self.system_admin_cloud.identity.delete_limit(limit) + self.assertIsNone(ret) + + def test_limit(self): + # create the limit + + limit = self.system_admin_cloud.identity.create_limit( + resource_name=self.resource_name, + service_id=self.service.id, + project_id=self.project.id, + resource_limit=50, + ) + self.addCleanup(self._delete_limit, limit) + self.assertIsInstance(limit, _limit.Limit) + self.assertIsNotNone(limit.id) + self.assertIsNone(limit.description) + self.assertEqual(self.service.id, limit.service_id) + self.assertEqual(self.project.id, limit.project_id) + self.assertEqual(50, limit.resource_limit) + + # update the limit + + limit = self.system_admin_cloud.identity.update_limit( + limit, description=self.limit_description + ) + self.assertIsInstance(limit, _limit.Limit) + self.assertEqual(self.limit_description, limit.description) + + # retrieve details of the (updated) limit by ID + + limit = self.system_admin_cloud.identity.get_limit(limit.id) + self.assertIsInstance(limit, _limit.Limit) + self.assertEqual(self.limit_description, limit.description) + + # (there's no name, so no way to retrieve by name) + + # list all limits + + limits = list(self.system_admin_cloud.identity.limits()) + self.assertIsInstance(limits[0], _limit.Limit) + self.assertIn( + self.resource_name, + {x.resource_name for x in limits}, + ) diff --git a/openstack/tests/functional/identity/v3/test_projects.py b/openstack/tests/functional/identity/v3/test_project.py similarity index 100% rename from openstack/tests/functional/identity/v3/test_projects.py rename to openstack/tests/functional/identity/v3/test_project.py diff --git a/openstack/tests/functional/identity/v3/test_registered_limit.py b/openstack/tests/functional/identity/v3/test_registered_limit.py new file mode 100644 index 000000000..6701cfc15 --- /dev/null +++ b/openstack/tests/functional/identity/v3/test_registered_limit.py @@ -0,0 +1,111 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from openstack.identity.v3 import registered_limit as _registered_limit +from openstack.tests.functional import base + + +class TestRegisteredLimit(base.BaseFunctionalTest): + def setUp(self): + super().setUp() + + self.region_name = self.getUniqueString('region') + self.region = self.system_admin_cloud.identity.create_region( + name=self.region_name + ) + self.addCleanup( + self.system_admin_cloud.identity.delete_region, self.region + ) + + self.service_name = self.getUniqueString('service') + self.service_type = self.getUniqueString('type') + self.service = self.system_admin_cloud.identity.create_service( + name=self.service_name, + type=self.service_type, + ) + self.addCleanup( + self.system_admin_cloud.identity.delete_service, self.service + ) + + self.resource_name = self.getUniqueString('resource') + self.registered_limit_description = self.getUniqueString( + 'registered_limit' + ) + + def _delete_registered_limit(self, registered_limit): + ret = self.system_admin_cloud.identity.delete_registered_limit( + registered_limit + ) + self.assertIsNone(ret) + + def test_registered_limit(self): + # create the registered limit + + registered_limit = ( + self.system_admin_cloud.identity.create_registered_limit( + resource_name=self.resource_name, + service_id=self.service.id, + region_id=self.region.id, + default_limit=10, + ) + ) + self.addCleanup(self._delete_registered_limit, registered_limit) + self.assertIsInstance( + registered_limit, _registered_limit.RegisteredLimit + ) + self.assertIsNotNone(registered_limit.id) + self.assertIsNone(registered_limit.description) + self.assertEqual(self.service.id, registered_limit.service_id) + self.assertEqual(self.region.id, registered_limit.region_id) + + # update the registered limit + + registered_limit = ( + self.system_admin_cloud.identity.update_registered_limit( + registered_limit, description=self.registered_limit_description + ) + ) + self.assertIsInstance( + registered_limit, _registered_limit.RegisteredLimit + ) + self.assertEqual( + self.registered_limit_description, registered_limit.description + ) + + # retrieve details of the (updated) registered limit by ID + + registered_limit = ( + self.system_admin_cloud.identity.get_registered_limit( + registered_limit.id + ) + ) + self.assertIsInstance( + registered_limit, _registered_limit.RegisteredLimit + ) + self.assertEqual( + self.registered_limit_description, registered_limit.description + ) + + # (there's no name, so no way to retrieve by name) + + # list all registered limits + + registered_limits = list( + self.system_admin_cloud.identity.registered_limits() + ) + self.assertIsInstance( + registered_limits[0], _registered_limit.RegisteredLimit + ) + self.assertIn( + self.resource_name, + {x.resource_name for x in registered_limits}, + )