Identity v3 Domain Configuration Client

Adds a new client lib to handle the domain configuration API.

The domain configuration API is part of the standard
keystone v3 API [0].

This patch also adds unit tests and API tests for the domain
configuration client. Most of its APIs are RULE_ADMIN_REQUIRED
[1] and so the API tests are included in the admin namespace.

[0] https://developer.openstack.org/api-ref/identity/v3/
[1] https://github.com/openstack/keystone/blob/master/keystone/common/policies/domain_config.py

Change-Id: I2f6229076aa7d2939dd91c487085ea73de001403
This commit is contained in:
Felipe Monteiro 2017-04-03 22:04:06 +01:00
parent dbffd22f44
commit 94d8577365
7 changed files with 605 additions and 6 deletions

View File

@ -0,0 +1,5 @@
---
features:
- |
Add a new client to handle the domain configuration feature from the
identity v3 API.

View File

@ -0,0 +1,184 @@
# Copyright 2017 AT&T Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.identity import base
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
class DomainConfigurationTestJSON(base.BaseIdentityV3AdminTest):
custom_config = {
"identity": {
"driver": "ldap"
},
"ldap": {
"url": "ldap://myldap.com:389/",
"user_tree_dn": "ou=Users,dc=my_new_root,dc=org"
}
}
@classmethod
def setup_clients(cls):
super(DomainConfigurationTestJSON, cls).setup_clients()
cls.client = cls.domain_config_client
@classmethod
def resource_setup(cls):
super(DomainConfigurationTestJSON, cls).resource_setup()
cls.group = cls.groups_client.create_group(
name=data_utils.rand_name('group'),
description=data_utils.rand_name('group-desc'))['group']
@classmethod
def resource_cleanup(cls):
cls.groups_client.delete_group(cls.group['id'])
super(DomainConfigurationTestJSON, cls).resource_cleanup()
def _create_domain_and_config(self, config):
domain = self.setup_test_domain()
config = self.client.create_domain_config(domain['id'], **config)[
'config']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.client.delete_domain_config, domain['id'])
return domain, config
@decorators.idempotent_id('11a02bf0-6f94-4380-b3b0-c8dc18fc0d22')
def test_show_default_group_config_and_options(self):
# The API supports only the identity and ldap groups. For the ldap
# group, a valid value is url or user_tree_dn. For the identity group,
# a valid value is driver.
# Check that the default config has the identity and ldap groups.
config = self.client.show_default_config_settings()['config']
self.assertIsInstance(config, dict)
self.assertIn('identity', config)
self.assertIn('ldap', config)
# Check that the identity group is correct.
identity_config = self.client.show_default_group_config('identity')[
'config']
self.assertIsInstance(identity_config, dict)
self.assertIn('identity', identity_config)
self.assertIn('driver', identity_config['identity'])
self.assertIn('list_limit', identity_config['identity'])
# Show each option for the default domain and identity group.
for config_opt_name in ['driver', 'list_limit']:
retrieved_config_opt = self.client.show_default_group_option(
'identity', config_opt_name)['config']
self.assertIn(config_opt_name, retrieved_config_opt)
# Check that the ldap group is correct.
ldap_config = self.client.show_default_group_config('ldap')['config']
self.assertIsInstance(ldap_config, dict)
self.assertIn('ldap', ldap_config)
# Several valid options exist for ldap group.
valid_options = ldap_config['ldap'].keys()
# Show each option for the default domain and ldap group.
for config_opt_name in valid_options:
retrieved_config_opt = self.client.show_default_group_option(
'ldap', config_opt_name)['config']
self.assertIn(config_opt_name, retrieved_config_opt)
@decorators.idempotent_id('9e3ff13c-f597-4f01-9377-d6c06c2a1477')
def test_create_domain_config_and_show_config_groups_and_options(self):
domain, created_config = self._create_domain_and_config(
self.custom_config)
# Check that the entire configuration is correct.
self.assertEqual(self.custom_config, created_config)
# Check that each configuration group is correct.
for group_name in self.custom_config.keys():
group_cfg = self.client.show_domain_group_config(
domain['id'], group_name)['config']
self.assertIn(group_name, group_cfg)
self.assertEqual(self.custom_config[group_name],
group_cfg[group_name])
# Check that each configuration option is correct.
for opt_name in self.custom_config[group_name].keys():
group_opt = self.client.show_domain_group_option_config(
domain['id'], group_name, opt_name)['config']
self.assertIn(opt_name, group_opt)
self.assertEqual(self.custom_config[group_name][opt_name],
group_opt[opt_name])
@decorators.idempotent_id('7161023e-5dd0-4612-9da0-1bac6ac30b63')
def test_create_update_and_delete_domain_config(self):
domain, created_config = self._create_domain_and_config(
self.custom_config)
new_config = created_config
new_config['ldap']['url'] = data_utils.rand_url()
# Check that the altered configuration is reflected in updated_config.
updated_config = self.client.update_domain_config(
domain['id'], **new_config)['config']
self.assertEqual(new_config, updated_config)
# Check that showing the domain config shows the altered configuration.
retrieved_config = self.client.show_domain_config(domain['id'])[
'config']
self.assertEqual(new_config, retrieved_config)
# Check that deleting a configuration works.
self.client.delete_domain_config(domain['id'])
self.assertRaises(lib_exc.NotFound, self.client.show_domain_config,
domain['id'])
@decorators.idempotent_id('c7510fa2-6661-4170-9c6b-4783a80651e9')
def test_create_update_and_delete_domain_config_groups_and_opts(self):
domain, _ = self._create_domain_and_config(self.custom_config)
# Check that updating configuration groups work.
new_driver = data_utils.rand_name('driver')
new_limit = data_utils.rand_int_id(0, 100)
new_group_config = {'identity': {'driver': new_driver,
'list_limit': new_limit}}
updated_config = self.client.update_domain_group_config(
domain['id'], 'identity', **new_group_config)['config']
self.assertEqual(new_driver, updated_config['identity']['driver'])
self.assertEqual(new_limit, updated_config['identity']['list_limit'])
# Check that updating individual configuration group options work.
new_driver = data_utils.rand_name('driver')
updated_config = self.client.update_domain_group_option_config(
domain['id'], 'identity', 'driver', driver=new_driver)['config']
self.assertEqual(new_driver, updated_config['identity']['driver'])
# Check that deleting individual configuration group options work.
self.client.delete_domain_group_option_config(
domain['id'], 'identity', 'driver')
self.assertRaises(lib_exc.NotFound,
self.client.show_domain_group_option_config,
domain['id'], 'identity', 'driver')
# Check that deleting configuration groups work.
self.client.delete_domain_group_config(domain['id'], 'identity')
self.assertRaises(lib_exc.NotFound,
self.client.show_domain_group_config,
domain['id'], 'identity')

View File

@ -213,6 +213,7 @@ class BaseIdentityV3AdminTest(BaseIdentityV3Test):
cls.projects_client = cls.os_adm.projects_client cls.projects_client = cls.os_adm.projects_client
cls.role_assignments = cls.os_admin.role_assignments_client cls.role_assignments = cls.os_admin.role_assignments_client
cls.oauth_consumers_client = cls.os_adm.oauth_consumers_client cls.oauth_consumers_client = cls.os_adm.oauth_consumers_client
cls.domain_config_client = cls.os_adm.domain_config_client
if CONF.identity.admin_domain_scope: if CONF.identity.admin_domain_scope:
# NOTE(andreaf) When keystone policy requires it, the identity # NOTE(andreaf) When keystone policy requires it, the identity
# admin clients for these tests shall use 'domain' scoped tokens. # admin clients for these tests shall use 'domain' scoped tokens.

View File

@ -231,6 +231,8 @@ class Manager(clients.ServiceClients):
**params_v3) **params_v3)
self.oauth_consumers_client = self.identity_v3.OAUTHConsumerClient( self.oauth_consumers_client = self.identity_v3.OAUTHConsumerClient(
**params_v3) **params_v3)
self.domain_config_client = self.identity_v3.DomainConfigurationClient(
**params_v3)
# Token clients do not use the catalog. They only need default_params. # Token clients do not use the catalog. They only need default_params.
# They read auth_url, so they should only be set if the corresponding # They read auth_url, so they should only be set if the corresponding

View File

@ -14,6 +14,8 @@
from tempest.lib.services.identity.v3.credentials_client import \ from tempest.lib.services.identity.v3.credentials_client import \
CredentialsClient CredentialsClient
from tempest.lib.services.identity.v3.domain_configuration_client \
import DomainConfigurationClient
from tempest.lib.services.identity.v3.domains_client import DomainsClient from tempest.lib.services.identity.v3.domains_client import DomainsClient
from tempest.lib.services.identity.v3.endpoints_client import EndPointsClient from tempest.lib.services.identity.v3.endpoints_client import EndPointsClient
from tempest.lib.services.identity.v3.groups_client import GroupsClient from tempest.lib.services.identity.v3.groups_client import GroupsClient
@ -34,9 +36,9 @@ from tempest.lib.services.identity.v3.trusts_client import TrustsClient
from tempest.lib.services.identity.v3.users_client import UsersClient from tempest.lib.services.identity.v3.users_client import UsersClient
from tempest.lib.services.identity.v3.versions_client import VersionsClient from tempest.lib.services.identity.v3.versions_client import VersionsClient
__all__ = ['CredentialsClient', 'DomainsClient', 'EndPointsClient', __all__ = ['CredentialsClient', 'DomainsClient', 'DomainConfigurationClient',
'GroupsClient', 'IdentityClient', 'InheritedRolesClient', 'EndPointsClient', 'GroupsClient', 'IdentityClient',
'PoliciesClient', 'ProjectsClient', 'RegionsClient', 'InheritedRolesClient', 'OAUTHConsumerClient', 'PoliciesClient',
'RoleAssignmentsClient', 'RolesClient', 'ServicesClient', 'ProjectsClient', 'RegionsClient', 'RoleAssignmentsClient',
'V3TokenClient', 'TrustsClient', 'UsersClient', 'VersionsClient', 'RolesClient', 'ServicesClient', 'V3TokenClient', 'TrustsClient',
'OAUTHConsumerClient'] 'UsersClient', 'VersionsClient']

View File

@ -0,0 +1,188 @@
# Copyright 2017 AT&T Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_serialization import jsonutils as json
from tempest.lib.common import rest_client
class DomainConfigurationClient(rest_client.RestClient):
api_version = "v3"
def show_default_config_settings(self):
"""Show default configuration settings.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#show-default-configuration-settings
"""
url = 'domains/config/default'
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def show_default_group_config(self, group):
"""Show default configuration for a group.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#show-default-configuration-for-a-group
"""
url = 'domains/config/%s/default' % group
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def show_default_group_option(self, group, option):
"""Show default option for a group.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#show-default-option-for-a-group
"""
url = 'domains/config/%s/%s/default' % (group, option)
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def show_domain_group_option_config(self, domain_id, group, option):
"""Show domain group option configuration.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#show-domain-group-option-configuration
"""
url = 'domains/%s/config/%s/%s' % (domain_id, group, option)
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def update_domain_group_option_config(self, domain_id, group, option,
**kwargs):
"""Update domain group option configuration.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#update-domain-group-option-configuration
"""
url = 'domains/%s/config/%s/%s' % (domain_id, group, option)
resp, body = self.patch(url, json.dumps({'config': kwargs}))
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def delete_domain_group_option_config(self, domain_id, group, option):
"""Delete domain group option configuration.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#delete-domain-group-option-configuration
"""
url = 'domains/%s/config/%s/%s' % (domain_id, group, option)
resp, body = self.delete(url)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp, body)
def show_domain_group_config(self, domain_id, group):
"""Shows details for a domain group configuration.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#show-domain-group-configuration
"""
url = 'domains/%s/config/%s' % (domain_id, group)
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def update_domain_group_config(self, domain_id, group, **kwargs):
"""Update domain group configuration.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#update-domain-group-configuration
"""
url = 'domains/%s/config/%s' % (domain_id, group)
resp, body = self.patch(url, json.dumps({'config': kwargs}))
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def delete_domain_group_config(self, domain_id, group):
"""Delete domain group configuration.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#delete-domain-group-configuration
"""
url = 'domains/%s/config/%s' % (domain_id, group)
resp, body = self.delete(url)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp, body)
def create_domain_config(self, domain_id, **kwargs):
"""Create domain configuration.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#create-domain-configuration
"""
url = 'domains/%s/config' % domain_id
resp, body = self.put(url, json.dumps({'config': kwargs}))
self.expected_success([200, 201], resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def show_domain_config(self, domain_id):
"""Show domain configuration.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#show-domain-configuration
"""
url = 'domains/%s/config' % domain_id
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def update_domain_config(self, domain_id, **kwargs):
"""Update domain configuration.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#update-domain-configuration
"""
url = 'domains/%s/config' % domain_id
resp, body = self.patch(url, json.dumps({'config': kwargs}))
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def delete_domain_config(self, domain_id):
"""Delete domain configuration.
For a full list of available parameters, please refer to the official
API reference:
http://developer.openstack.org/api-ref/identity/v3/index.html#delete-domain-configuration
"""
url = 'domains/%s/config' % domain_id
resp, body = self.delete(url)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp, body)

View File

@ -0,0 +1,217 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tempest.lib.services.identity.v3 import domain_configuration_client
from tempest.tests.lib import fake_auth_provider
from tempest.tests.lib.services import base
class TestDomainConfigurationClient(base.BaseServiceTest):
FAKE_CONFIG_SETTINGS = {
"config": {
"identity": {
"driver": "ldap"
},
"ldap": {
"url": "ldap://localhost",
"user": "",
"suffix": "cn=example,cn=com",
}
}
}
FAKE_DOMAIN_ID = '07ef7d04-2941-4bee-8551-f79f08a021de'
def setUp(self):
super(TestDomainConfigurationClient, self).setUp()
fake_auth = fake_auth_provider.FakeAuthProvider()
self.client = domain_configuration_client.DomainConfigurationClient(
fake_auth, 'identity', 'regionOne')
def _test_show_default_config_settings(self, bytes_body=False):
self.check_service_client_function(
self.client.show_default_config_settings,
'tempest.lib.common.rest_client.RestClient.get',
self.FAKE_CONFIG_SETTINGS,
bytes_body)
def _test_show_default_group_config(self, bytes_body=False):
self.check_service_client_function(
self.client.show_default_group_config,
'tempest.lib.common.rest_client.RestClient.get',
self.FAKE_CONFIG_SETTINGS['config']['ldap'],
bytes_body,
group='ldap')
def _test_show_default_group_option(self, bytes_body=False):
self.check_service_client_function(
self.client.show_default_group_option,
'tempest.lib.common.rest_client.RestClient.get',
{'driver': 'ldap'},
bytes_body,
group='identity',
option='driver')
def _test_show_domain_group_option_config(self, bytes_body=False):
self.check_service_client_function(
self.client.show_domain_group_option_config,
'tempest.lib.common.rest_client.RestClient.get',
{'driver': 'ldap'},
bytes_body,
domain_id=self.FAKE_DOMAIN_ID,
group='identity',
option='driver')
def _test_update_domain_group_option_config(self, bytes_body=False):
self.check_service_client_function(
self.client.update_domain_group_option_config,
'tempest.lib.common.rest_client.RestClient.patch',
self.FAKE_CONFIG_SETTINGS,
bytes_body,
domain_id=self.FAKE_DOMAIN_ID,
group='identity',
option='driver',
url='http://myldap/my_other_root')
def _test_show_domain_group_config(self, bytes_body=False):
self.check_service_client_function(
self.client.show_domain_group_config,
'tempest.lib.common.rest_client.RestClient.get',
self.FAKE_CONFIG_SETTINGS['config']['ldap'],
bytes_body,
domain_id=self.FAKE_DOMAIN_ID,
group='ldap')
def _test_update_domain_group_config(self, bytes_body=False):
self.check_service_client_function(
self.client.update_domain_group_config,
'tempest.lib.common.rest_client.RestClient.patch',
self.FAKE_CONFIG_SETTINGS['config']['ldap'],
bytes_body,
domain_id=self.FAKE_DOMAIN_ID,
group='ldap',
**self.FAKE_CONFIG_SETTINGS['config'])
def _test_create_domain_config(self, bytes_body=False):
self.check_service_client_function(
self.client.create_domain_config,
'tempest.lib.common.rest_client.RestClient.put',
self.FAKE_CONFIG_SETTINGS,
bytes_body,
domain_id=self.FAKE_DOMAIN_ID,
status=201)
def _test_show_domain_config(self, bytes_body=False):
self.check_service_client_function(
self.client.show_domain_config,
'tempest.lib.common.rest_client.RestClient.get',
self.FAKE_CONFIG_SETTINGS,
bytes_body,
domain_id=self.FAKE_DOMAIN_ID)
def _test_update_domain_config(self, bytes_body=False):
self.check_service_client_function(
self.client.update_domain_config,
'tempest.lib.common.rest_client.RestClient.patch',
self.FAKE_CONFIG_SETTINGS,
bytes_body,
domain_id=self.FAKE_DOMAIN_ID)
def test_show_default_config_settings_with_str_body(self):
self._test_show_default_config_settings()
def test_show_default_config_settings_with_bytes_body(self):
self._test_show_default_config_settings(bytes_body=True)
def test_show_default_group_config_with_str_body(self):
self._test_show_default_group_config()
def test_show_default_group_config_with_bytes_body(self):
self._test_show_default_group_config(bytes_body=True)
def test_show_default_group_option_with_str_body(self):
self._test_show_default_group_option()
def test_show_default_group_option_with_bytes_body(self):
self._test_show_default_group_option(bytes_body=True)
def test_show_domain_group_option_config_with_str_body(self):
self._test_show_domain_group_option_config()
def test_show_domain_group_option_config_with_bytes_body(self):
self._test_show_domain_group_option_config(bytes_body=True)
def test_update_domain_group_option_config_with_str_body(self):
self._test_update_domain_group_option_config()
def test_update_domain_group_option_config_with_bytes_body(self):
self._test_update_domain_group_option_config(bytes_body=True)
def test_delete_domain_group_option_config(self):
self.check_service_client_function(
self.client.delete_domain_group_option_config,
'tempest.lib.common.rest_client.RestClient.delete',
{},
status=204,
domain_id=self.FAKE_DOMAIN_ID,
group='identity',
option='driver')
def test_show_domain_group_config_with_str_body(self):
self._test_show_domain_group_config()
def test_show_domain_group_config_with_bytes_body(self):
self._test_show_domain_group_config(bytes_body=True)
def test_test_update_domain_group_config_with_str_body(self):
self._test_update_domain_group_config()
def test_update_domain_group_config_with_bytes_body(self):
self._test_update_domain_group_config(bytes_body=True)
def test_delete_domain_group_config(self):
self.check_service_client_function(
self.client.delete_domain_group_config,
'tempest.lib.common.rest_client.RestClient.delete',
{},
status=204,
domain_id=self.FAKE_DOMAIN_ID,
group='identity')
def test_create_domain_config_with_str_body(self):
self._test_create_domain_config()
def test_create_domain_config_with_bytes_body(self):
self._test_create_domain_config(bytes_body=True)
def test_show_domain_config_with_str_body(self):
self._test_show_domain_config()
def test_show_domain_config_with_bytes_body(self):
self._test_show_domain_config(bytes_body=True)
def test_update_domain_config_with_str_body(self):
self._test_update_domain_config()
def test_update_domain_config_with_bytes_body(self):
self._test_update_domain_config(bytes_body=True)
def test_delete_domain_config(self):
self.check_service_client_function(
self.client.delete_domain_config,
'tempest.lib.common.rest_client.RestClient.delete',
{},
status=204,
domain_id=self.FAKE_DOMAIN_ID)