Use system_reader in identity v3 admin tests

Add RBAC support to identity v3 admin tests by introducing system_reader
credentials for read operations (list/show) while retaining admin
credentials for write operations (create/update/delete). This ensures
proper scope enforcement when keystone enforce_scope is enabled.

Implements-blueprint: test-keystone-with-srbac-defaults
Change-Id: Idb6723dd42eeec68ef115d2d8860e36abf2c5276
Signed-off-by: Abhishek Bongale <abhishekbongale@outlook.com>
This commit is contained in:
Abhishek Bongale
2025-12-19 12:31:49 +00:00
parent 32545c430b
commit 8732781c64
12 changed files with 273 additions and 82 deletions

View File

@@ -26,6 +26,27 @@ CONF = config.CONF
class DomainsTestJSON(base.BaseIdentityV3AdminTest):
"""Test identity domains"""
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(DomainsTestJSON, cls).setup_clients()
if CONF.identity.use_system_token:
# Use system reader for listing/showing domains
cls.reader_domains_client = (
cls.os_system_reader.domains_client)
# Use system reader for showing users
cls.reader_users_client = (
cls.os_system_reader.users_v3_client)
# Use system reader for showing groups
cls.reader_groups_client = (
cls.os_system_reader.groups_client)
else:
# Use admin client by default
cls.reader_domains_client = cls.domains_client
cls.reader_users_client = cls.users_client
cls.reader_groups_client = cls.groups_client
@classmethod
def resource_setup(cls):
super(DomainsTestJSON, cls).resource_setup()
@@ -41,7 +62,7 @@ class DomainsTestJSON(base.BaseIdentityV3AdminTest):
"""Test listing domains"""
fetched_ids = list()
# List and Verify Domains
body = self.domains_client.list_domains()['domains']
body = self.reader_domains_client.list_domains()['domains']
for d in body:
fetched_ids.append(d['id'])
missing_doms = [d for d in self.setup_domains
@@ -52,7 +73,7 @@ class DomainsTestJSON(base.BaseIdentityV3AdminTest):
def test_list_domains_filter_by_name(self):
"""Test listing domains filtering by name"""
params = {'name': self.setup_domains[0]['name']}
fetched_domains = self.domains_client.list_domains(
fetched_domains = self.reader_domains_client.list_domains(
**params)['domains']
# Verify the filtered list is correct, domain names are unique
# so exactly one domain should be found with the provided name
@@ -64,7 +85,7 @@ class DomainsTestJSON(base.BaseIdentityV3AdminTest):
def test_list_domains_filter_by_enabled(self):
"""Test listing domains filtering by enabled domains"""
params = {'enabled': True}
fetched_domains = self.domains_client.list_domains(
fetched_domains = self.reader_domains_client.list_domains(
**params)['domains']
# Verify the filtered list is correct
self.assertIn(self.setup_domains[0], fetched_domains)
@@ -108,14 +129,14 @@ class DomainsTestJSON(base.BaseIdentityV3AdminTest):
self.assertEqual(new_desc, updated_domain['description'])
self.assertEqual(False, updated_domain['enabled'])
# Show domain
fetched_domain = self.domains_client.show_domain(
fetched_domain = self.reader_domains_client.show_domain(
domain['id'])['domain']
self.assertEqual(new_name, fetched_domain['name'])
self.assertEqual(new_desc, fetched_domain['description'])
self.assertEqual(False, fetched_domain['enabled'])
# Delete domain
self.domains_client.delete_domain(domain['id'])
body = self.domains_client.list_domains()['domains']
body = self.reader_domains_client.list_domains()['domains']
domains_list = [d['id'] for d in body]
self.assertNotIn(domain['id'], domains_list)
@@ -130,11 +151,11 @@ class DomainsTestJSON(base.BaseIdentityV3AdminTest):
self.delete_domain(domain['id'])
# Check the domain, its users and groups are gone
self.assertRaises(exceptions.NotFound,
self.domains_client.show_domain, domain['id'])
self.reader_domains_client.show_domain, domain['id'])
self.assertRaises(exceptions.NotFound,
self.users_client.show_user, user['id'])
self.reader_users_client.show_user, user['id'])
self.assertRaises(exceptions.NotFound,
self.groups_client.show_group, group['id'])
self.reader_groups_client.show_group, group['id'])
@decorators.idempotent_id('036df86e-bb5d-42c0-a7c2-66b9db3a6046')
def test_create_domain_with_disabled_status(self):

View File

@@ -30,10 +30,21 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest):
# pre-provisioned credentials provider.
force_tenant_isolation = False
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(EndPointsTestJSON, cls).setup_clients()
cls.client = cls.endpoints_client
if CONF.identity.use_system_token:
# Use system reader for listing/showing endpoints
cls.reader_client = cls.os_system_reader.endpoints_v3_client
# Use system reader for showing regions
cls.reader_regions_client = cls.os_system_reader.regions_client
else:
# Use admin client by default
cls.reader_client = cls.client
cls.reader_regions_client = cls.regions_client
@classmethod
def resource_setup(cls):
@@ -55,7 +66,8 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest):
endpoint = cls.client.create_endpoint(
service_id=cls.service_ids[i], interface=interfaces[i],
url=url, region=region_name, enabled=True)['endpoint']
region = cls.regions_client.show_region(region_name)['region']
region = cls.reader_regions_client.show_region(region_name)[
'region']
cls.addClassResourceCleanup(
cls.regions_client.delete_region, region['id'])
cls.addClassResourceCleanup(
@@ -81,7 +93,7 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest):
def test_list_endpoints(self):
"""Test listing keystone endpoints by filters"""
# Get the list of all the endpoints.
fetched_endpoints = self.client.list_endpoints()['endpoints']
fetched_endpoints = self.reader_client.list_endpoints()['endpoints']
fetched_endpoint_ids = [e['id'] for e in fetched_endpoints]
# Check that all the created endpoints are present in
# "fetched_endpoints".
@@ -93,9 +105,9 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest):
', '.join(str(e) for e in missing_endpoints))
# Check that filtering endpoints by service_id works.
fetched_endpoints_for_service = self.client.list_endpoints(
fetched_endpoints_for_service = self.reader_client.list_endpoints(
service_id=self.service_ids[0])['endpoints']
fetched_endpoints_for_alt_service = self.client.list_endpoints(
fetched_endpoints_for_alt_service = self.reader_client.list_endpoints(
service_id=self.service_ids[1])['endpoints']
# Assert that both filters returned the correct result.
@@ -106,9 +118,9 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest):
fetched_endpoints_for_alt_service[0]['id']]))
# Check that filtering endpoints by interface works.
fetched_public_endpoints = self.client.list_endpoints(
fetched_public_endpoints = self.reader_client.list_endpoints(
interface='public')['endpoints']
fetched_internal_endpoints = self.client.list_endpoints(
fetched_internal_endpoints = self.reader_client.list_endpoints(
interface='internal')['endpoints']
# Check that the expected endpoint_id is present per filter. [0] is
@@ -129,7 +141,7 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest):
interface=interface,
url=url, region=region_name,
enabled=True)['endpoint']
region = self.regions_client.show_region(region_name)['region']
region = self.reader_regions_client.show_region(region_name)['region']
self.addCleanup(self.regions_client.delete_region, region['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.client.delete_endpoint, endpoint['id'])
@@ -138,13 +150,13 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest):
self.assertEqual(url, endpoint['url'])
# Checking if created endpoint is present in the list of endpoints
fetched_endpoints = self.client.list_endpoints()['endpoints']
fetched_endpoints = self.reader_client.list_endpoints()['endpoints']
fetched_endpoints_id = [e['id'] for e in fetched_endpoints]
self.assertIn(endpoint['id'], fetched_endpoints_id)
# Show endpoint
fetched_endpoint = (
self.client.show_endpoint(endpoint['id'])['endpoint'])
self.reader_client.show_endpoint(endpoint['id'])['endpoint'])
# Asserting if the attributes of endpoint are the same
self.assertEqual(self.service_ids[0], fetched_endpoint['service_id'])
self.assertEqual(interface, fetched_endpoint['interface'])
@@ -156,7 +168,7 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest):
self.client.delete_endpoint(endpoint['id'])
# Checking whether endpoint is deleted successfully
fetched_endpoints = self.client.list_endpoints()['endpoints']
fetched_endpoints = self.reader_client.list_endpoints()['endpoints']
fetched_endpoints_id = [e['id'] for e in fetched_endpoints]
self.assertNotIn(endpoint['id'], fetched_endpoints_id)
@@ -187,7 +199,8 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest):
interface=interface1,
url=url1, region=region1_name,
enabled=True)['endpoint'])
region1 = self.regions_client.show_region(region1_name)['region']
region1 = self.reader_regions_client.show_region(region1_name)[
'region']
self.addCleanup(self.regions_client.delete_region, region1['id'])
# Updating endpoint with new values
@@ -199,7 +212,8 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest):
interface=interface2,
url=url2, region=region2_name,
enabled=False)['endpoint']
region2 = self.regions_client.show_region(region2_name)['region']
region2 = self.reader_regions_client.show_region(region2_name)[
'region']
self.addCleanup(self.regions_client.delete_region, region2['id'])
self.addCleanup(self.client.delete_endpoint, endpoint_for_update['id'])

View File

@@ -30,6 +30,23 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest):
# pre-provisioned credentials provider.
force_tenant_isolation = False
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(GroupsV3TestJSON, cls).setup_clients()
if CONF.identity.use_system_token:
# Use system reader for listing/showing groups
cls.reader_groups_client = (
cls.os_system_reader.groups_client)
# Use system reader for listing user groups
cls.reader_users_client = (
cls.os_system_reader.users_v3_client)
else:
# Use admin client by default
cls.reader_groups_client = cls.groups_client
cls.reader_users_client = cls.users_client
@classmethod
def resource_setup(cls):
super(GroupsV3TestJSON, cls).resource_setup()
@@ -60,7 +77,7 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest):
self.assertEqual(updated_group['description'], first_desc_update)
# Verify that the updated values are reflected after performing show.
new_group = self.groups_client.show_group(group['id'])['group']
new_group = self.reader_groups_client.show_group(group['id'])['group']
self.assertEqual(group['id'], new_group['id'])
self.assertEqual(first_name_update, new_group['name'])
self.assertEqual(first_desc_update, new_group['description'])
@@ -94,7 +111,8 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest):
self.groups_client.add_group_user(group['id'], user['id'])
# list users in group
group_users = self.groups_client.list_group_users(group['id'])['users']
group_users = self.reader_groups_client.list_group_users(group['id'])[
'users']
self.assertEqual(sorted(users, key=lambda k: k['name']),
sorted(group_users, key=lambda k: k['name']))
# check and delete user in group
@@ -102,7 +120,8 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest):
self.groups_client.check_group_user_existence(
group['id'], user['id'])
self.groups_client.delete_group_user(group['id'], user['id'])
group_users = self.groups_client.list_group_users(group['id'])['users']
group_users = self.reader_groups_client.list_group_users(group['id'])[
'users']
self.assertEqual(len(group_users), 0)
@decorators.idempotent_id('64573281-d26a-4a52-b899-503cb0f4e4ec')
@@ -121,7 +140,8 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest):
groups.append(group)
self.groups_client.add_group_user(group['id'], user['id'])
# list groups which user belongs to
user_groups = self.users_client.list_user_groups(user['id'])['groups']
user_groups = self.reader_users_client.list_user_groups(user['id'])[
'groups']
# The `membership_expires_at` attribute is present when listing user
# group memberships, and is not an attribute of the groups themselves.
# Therefore we remove it from the comparison.
@@ -146,10 +166,10 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest):
# of listing all users and listing all groups are not supported,
# they need a domain filter to be specified
if CONF.identity_feature_enabled.domain_specific_drivers:
body = self.groups_client.list_groups(
body = self.reader_groups_client.list_groups(
domain_id=self.domain['id'])['groups']
else:
body = self.groups_client.list_groups()['groups']
body = self.reader_groups_client.list_groups()['groups']
for g in body:
fetched_ids.append(g['id'])
missing_groups = [g for g in group_ids if g not in fetched_ids]

View File

@@ -26,13 +26,26 @@ CONF = config.CONF
class BaseListProjectsTestJSON(base.BaseIdentityV3AdminTest):
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(BaseListProjectsTestJSON, cls).setup_clients()
if CONF.identity.use_system_token:
# Use system reader for listing projects
cls.reader_projects_client = (
cls.os_system_reader.projects_client)
else:
# Use admin client by default
cls.reader_projects_client = cls.projects_client
def _list_projects_with_params(self, included, excluded, params, key):
# Validate that projects in ``included`` belongs to the projects
# returned that match ``params`` but not projects in ``excluded``
all_projects = self.projects_client.list_projects()['projects']
all_projects = self.reader_projects_client.list_projects()['projects']
LOG.debug("Complete list of projects available in keystone: %s",
all_projects)
body = self.projects_client.list_projects(params)['projects']
body = self.reader_projects_client.list_projects(params)['projects']
for p in included:
self.assertIn(p[key], map(lambda x: x[key], body))
for p in excluded:
@@ -75,7 +88,7 @@ class ListProjectsTestJSON(BaseListProjectsTestJSON):
def test_list_projects_with_parent(self):
"""Test listing projects with parent"""
params = {'parent_id': self.p3['parent_id']}
fetched_projects = self.projects_client.list_projects(
fetched_projects = self.reader_projects_client.list_projects(
params)['projects']
self.assertNotEmpty(fetched_projects)
for project in fetched_projects:
@@ -111,10 +124,10 @@ class ListProjectsStaticTestJSON(BaseListProjectsTestJSON):
@decorators.idempotent_id('1d830662-22ad-427c-8c3e-4ec854b0af44')
def test_list_projects(self):
"""Test listing projects"""
list_projects = self.projects_client.list_projects()['projects']
list_projects = self.reader_projects_client.list_projects()['projects']
for p in [self.p1, self.p2]:
show_project = self.projects_client.show_project(p['id'])[
show_project = self.reader_projects_client.show_project(p['id'])[
'project']
self.assertIn(show_project, list_projects)

View File

@@ -24,12 +24,25 @@ CONF = config.CONF
class UsersV3TestJSON(base.BaseIdentityV3AdminTest):
"""Test listing keystone users"""
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(UsersV3TestJSON, cls).setup_clients()
if CONF.identity.use_system_token:
# Use system reader for listing users
cls.reader_users_client = (
cls.os_system_reader.users_v3_client)
else:
# Use admin client by default
cls.reader_users_client = cls.users_client
def _list_users_with_params(self, params, key, expected, not_expected):
# Helper method to list users filtered with params and
# assert the response based on expected and not_expected
# expected: user expected in the list response
# not_expected: user, which should not be present in list response
body = self.users_client.list_users(**params)['users']
body = self.reader_users_client.list_users(**params)['users']
self.assertIn(expected[key], map(lambda x: x[key], body))
self.assertNotIn(not_expected[key],
map(lambda x: x[key], body))
@@ -105,13 +118,13 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest):
# of listing all users and listing all groups are not supported,
# they need a domain filter to be specified
if CONF.identity_feature_enabled.domain_specific_drivers:
body_enabled_user = self.users_client.list_users(
body_enabled_user = self.reader_users_client.list_users(
domain_id=self.domain_enabled_user['domain_id'])['users']
body_non_enabled_user = self.users_client.list_users(
body_non_enabled_user = self.reader_users_client.list_users(
domain_id=self.non_domain_enabled_user['domain_id'])['users']
body = (body_enabled_user + body_non_enabled_user)
else:
body = self.users_client.list_users()['users']
body = self.reader_users_client.list_users()['users']
fetched_ids = [u['id'] for u in body]
missing_users = [u['id'] for u in self.users
@@ -123,7 +136,7 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest):
@decorators.idempotent_id('b4baa3ae-ac00-4b4e-9e27-80deaad7771f')
def test_get_user(self):
"""Get a user detail"""
user = self.users_client.show_user(self.users[0]['id'])['user']
user = self.reader_users_client.show_user(self.users[0]['id'])['user']
self.assertEqual(self.users[0]['id'], user['id'])
self.assertEqual(self.users[0]['name'], user['name'])
self.assertEqual(self.alt_email, user['email'])

View File

@@ -24,6 +24,19 @@ CONF = config.CONF
class PoliciesTestJSON(base.BaseIdentityV3AdminTest):
"""Test keystone policies"""
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(PoliciesTestJSON, cls).setup_clients()
if CONF.identity.use_system_token:
# Use system reader for listing/showing policies
cls.reader_policies_client = (
cls.os_system_reader.policies_client)
else:
# Use admin client by default
cls.reader_policies_client = cls.policies_client
def _delete_policy(self, policy_id):
self.policies_client.delete_policy(policy_id)
@@ -43,7 +56,7 @@ class PoliciesTestJSON(base.BaseIdentityV3AdminTest):
self.addCleanup(self._delete_policy, policy['id'])
policy_ids.append(policy['id'])
# List and Verify Policies
body = self.policies_client.list_policies()['policies']
body = self.reader_policies_client.list_policies()['policies']
for p in body:
fetched_ids.append(p['id'])
missing_pols = [p for p in policy_ids if p not in fetched_ids]
@@ -70,7 +83,7 @@ class PoliciesTestJSON(base.BaseIdentityV3AdminTest):
policy['id'], type=update_type)['policy']
self.assertIn('type', data)
# Assertion for updated value with fetched value
fetched_policy = self.policies_client.show_policy(
fetched_policy = self.reader_policies_client.show_policy(
policy['id'])['policy']
self.assertIn('id', fetched_policy)
self.assertIn('blob', fetched_policy)

View File

@@ -30,6 +30,27 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest):
# pre-provisioned credentials provider.
force_tenant_isolation = False
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(ProjectsTestJSON, cls).setup_clients()
if CONF.identity.use_system_token:
# Use system reader for listing/showing projects
cls.reader_projects_client = (
cls.os_system_reader.projects_client)
# Use system reader for listing/showing domains
cls.reader_domains_client = (
cls.os_system_reader.domains_client)
# Use system reader for showing users
cls.reader_users_client = (
cls.os_system_reader.users_v3_client)
else:
# Use admin client by default
cls.reader_projects_client = cls.projects_client
cls.reader_domains_client = cls.domains_client
cls.reader_users_client = cls.users_client
@decorators.idempotent_id('0ecf465c-0dc4-4532-ab53-91ffeb74d12d')
def test_project_create_with_description(self):
"""Test creating project with a description"""
@@ -40,7 +61,7 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest):
desc1 = project['description']
self.assertEqual(desc1, project_desc, 'Description should have '
'been sent in response for create')
body = self.projects_client.show_project(project_id)['project']
body = self.reader_projects_client.show_project(project_id)['project']
desc2 = body['description']
self.assertEqual(desc2, project_desc, 'Description does not appear '
'to be set')
@@ -56,7 +77,7 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest):
project_id = project['id']
self.assertEqual(project_name, project['name'])
self.assertEqual(domain['id'], project['domain_id'])
body = self.projects_client.show_project(project_id)['project']
body = self.reader_projects_client.show_project(project_id)['project']
self.assertEqual(project_name, body['name'])
self.assertEqual(domain['id'], body['domain_id'])
@@ -97,15 +118,15 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest):
# Check if the is_domain project is correctly returned by both
# project and domain APIs
projects_list = self.projects_client.list_projects(
projects_list = self.reader_projects_client.list_projects(
params={'is_domain': True})['projects']
project_ids = [p['id'] for p in projects_list]
self.assertIn(project['id'], project_ids)
# The domains API return different attributes for the entity, so we
# compare the entities IDs
domains_ids = [d['id'] for d in self.domains_client.list_domains()[
'domains']]
domains_list = self.reader_domains_client.list_domains()['domains']
domains_ids = [d['id'] for d in domains_list]
self.assertIn(project['id'], domains_ids)
@decorators.idempotent_id('1f66dc76-50cc-4741-a200-af984509e480')
@@ -115,7 +136,7 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest):
project_id = project['id']
self.assertTrue(project['enabled'],
'Enable should be True in response')
body = self.projects_client.show_project(project_id)['project']
body = self.reader_projects_client.show_project(project_id)['project']
self.assertTrue(body['enabled'], 'Enable should be True in lookup')
@decorators.idempotent_id('78f96a9c-e0e0-4ee6-a3ba-fbf6dfd03207')
@@ -124,7 +145,8 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest):
project = self.setup_test_project(enabled=False)
self.assertFalse(project['enabled'],
'Enable should be False in response')
body = self.projects_client.show_project(project['id'])['project']
body = self.reader_projects_client.show_project(project['id'])[
'project']
self.assertFalse(body['enabled'],
'Enable should be False in lookup')
@@ -144,7 +166,8 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest):
resp2_name = body['name']
self.assertNotEqual(resp1_name, resp2_name)
body = self.projects_client.show_project(project['id'])['project']
body = self.reader_projects_client.show_project(project['id'])[
'project']
resp3_name = body['name']
self.assertNotEqual(resp1_name, resp3_name)
@@ -166,7 +189,8 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest):
resp2_desc = body['description']
self.assertNotEqual(resp1_desc, resp2_desc)
body = self.projects_client.show_project(project['id'])['project']
body = self.reader_projects_client.show_project(project['id'])[
'project']
resp3_desc = body['description']
self.assertNotEqual(resp1_desc, resp3_desc)
@@ -187,7 +211,8 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest):
resp2_en = body['enabled']
self.assertNotEqual(resp1_en, resp2_en)
body = self.projects_client.show_project(project['id'])['project']
body = self.reader_projects_client.show_project(project['id'])[
'project']
resp3_en = body['enabled']
self.assertNotEqual(resp1_en, resp3_en)
@@ -217,7 +242,7 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest):
self.addCleanup(self.users_client.delete_user, user['id'])
# Get User To validate the user details
new_user_get = self.users_client.show_user(user['id'])['user']
new_user_get = self.reader_users_client.show_user(user['id'])['user']
# Assert response body of GET
self.assertEqual(u_name, new_user_get['name'])
self.assertEqual(u_desc, new_user_get['description'])
@@ -238,9 +263,9 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest):
project = self.setup_test_project(tags=tags)
# Show and list for the project
project_get = self.projects_client.show_project(
project_get = self.reader_projects_client.show_project(
project['id'])['project']
_projects = self.projects_client.list_projects()['projects']
_projects = self.reader_projects_client.list_projects()['projects']
project_list = next(x for x in _projects if x['id'] == project['id'])
# Assert the expected fields exist. More fields than expected may

View File

@@ -30,10 +30,18 @@ class RegionsTestJSON(base.BaseIdentityV3AdminTest):
# pre-provisioned credentials provider.
force_tenant_isolation = False
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(RegionsTestJSON, cls).setup_clients()
cls.client = cls.regions_client
if CONF.identity.use_system_token:
# Use system reader for listing/showing regions
cls.reader_client = cls.os_system_reader.regions_client
else:
# Use admin client by default
cls.reader_client = cls.client
@classmethod
def resource_setup(cls):
@@ -77,13 +85,13 @@ class RegionsTestJSON(base.BaseIdentityV3AdminTest):
self.assertEqual(self.setup_regions[1]['id'],
region['parent_region_id'])
# Get the details of region
region = self.client.show_region(region['id'])['region']
region = self.reader_client.show_region(region['id'])['region']
self.assertEqual(r_alt_description, region['description'])
self.assertEqual(self.setup_regions[1]['id'],
region['parent_region_id'])
# Delete the region
self.client.delete_region(region['id'])
body = self.client.list_regions()['regions']
body = self.reader_client.list_regions()['regions']
regions_list = [r['id'] for r in body]
self.assertNotIn(region['id'], regions_list)
@@ -104,7 +112,7 @@ class RegionsTestJSON(base.BaseIdentityV3AdminTest):
@decorators.idempotent_id('d180bf99-544a-445c-ad0d-0c0d27663796')
def test_list_regions(self):
"""Test getting a list of regions"""
fetched_regions = self.client.list_regions()['regions']
fetched_regions = self.reader_client.list_regions()['regions']
missing_regions =\
[e for e in self.setup_regions if e not in fetched_regions]
# Asserting List Regions response
@@ -124,7 +132,8 @@ class RegionsTestJSON(base.BaseIdentityV3AdminTest):
self.addCleanup(self.client.delete_region, region['id'])
# Get the list of regions filtering with the parent_region_id
params = {'parent_region_id': self.setup_regions[0]['id']}
fetched_regions = self.client.list_regions(params=params)['regions']
fetched_regions = self.reader_client.list_regions(params=params)[
'regions']
# Asserting list regions response
self.assertIn(region, fetched_regions)
for r in fetched_regions:

View File

@@ -32,6 +32,19 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
# pre-provisioned credentials provider.
force_tenant_isolation = False
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(RolesV3TestJSON, cls).setup_clients()
if CONF.identity.use_system_token:
# Use system reader for listing/showing roles
cls.reader_roles_client = (
cls.os_system_reader.roles_v3_client)
else:
# Use admin client by default
cls.reader_roles_client = cls.roles_client
@classmethod
def resource_setup(cls):
super(RolesV3TestJSON, cls).resource_setup()
@@ -97,11 +110,11 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self.assertIn('links', updated_role)
self.assertNotEqual(r_name, updated_role['name'])
new_role = self.roles_client.show_role(role['id'])['role']
new_role = self.reader_roles_client.show_role(role['id'])['role']
self.assertEqual(new_name, new_role['name'])
self.assertEqual(updated_role['id'], new_role['id'])
roles = self.roles_client.list_roles()['roles']
roles = self.reader_roles_client.list_roles()['roles']
self.assertIn(role['id'], [r['id'] for r in roles])
@decorators.idempotent_id('c6b80012-fe4a-498b-9ce8-eb391c05169f')
@@ -114,7 +127,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self.user_body['id'],
self.role['id'])
roles = self.roles_client.list_user_roles_on_project(
roles = self.reader_roles_client.list_user_roles_on_project(
self.project['id'], self.user_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -135,7 +148,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self.roles_client.create_user_role_on_domain(
self.domain['id'], self.user_body['id'], self.role['id'])
roles = self.roles_client.list_user_roles_on_domain(
roles = self.reader_roles_client.list_user_roles_on_domain(
self.domain['id'], self.user_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -155,7 +168,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self.roles_client.create_user_role_on_system(
self.user_body['id'], self.role['id'])
roles = self.roles_client.list_user_roles_on_system(
roles = self.reader_roles_client.list_user_roles_on_system(
self.user_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -177,7 +190,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self.roles_client.create_group_role_on_project(
self.project['id'], self.group_body['id'], self.role['id'])
# List group roles on project
roles = self.roles_client.list_group_roles_on_project(
roles = self.reader_roles_client.list_group_roles_on_project(
self.project['id'], self.group_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -210,7 +223,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self.roles_client.create_group_role_on_domain(
self.domain['id'], self.group_body['id'], self.role['id'])
roles = self.roles_client.list_group_roles_on_domain(
roles = self.reader_roles_client.list_group_roles_on_domain(
self.domain['id'], self.group_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -227,7 +240,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self.roles_client.create_group_role_on_system(
self.group_body['id'], self.role['id'])
roles = self.roles_client.list_group_roles_on_system(
roles = self.reader_roles_client.list_group_roles_on_system(
self.group_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -243,7 +256,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
def test_list_roles(self):
"""Test listing roles"""
# Return a list of all roles
body = self.roles_client.list_roles()['roles']
body = self.reader_roles_client.list_roles()['roles']
found = [role for role in body if role in self.roles]
self.assertEqual(len(found), len(self.roles))
@@ -278,7 +291,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
prior_role_id, implies_role_id)
# Show the inference rule and check its elements
resp_body = self.roles_client.show_role_inference_rule(
resp_body = self.reader_roles_client.show_role_inference_rule(
prior_role_id, implies_role_id)
self.assertIn('role_inference', resp_body)
role_inference = resp_body['role_inference']
@@ -293,7 +306,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
# Check if the inference rule no longer exists
self.assertRaises(
lib_exc.NotFound,
self.roles_client.show_role_inference_rule,
self.reader_roles_client.show_role_inference_rule,
prior_role_id,
implies_role_id)
@@ -313,14 +326,14 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self.roles[2]['id'], self.role['id'])
# Listing inferences rules from "roles[2]" should only return "role"
rules = self.roles_client.list_role_inferences_rules(
rules = self.reader_roles_client.list_role_inferences_rules(
self.roles[2]['id'])['role_inference']
self.assertEqual(1, len(rules['implies']))
self.assertEqual(self.role['id'], rules['implies'][0]['id'])
# Listing inferences rules from "roles[0]" should return "roles[1]" and
# "roles[2]" (only direct rules are listed)
rules = self.roles_client.list_role_inferences_rules(
rules = self.reader_roles_client.list_role_inferences_rules(
self.roles[0]['id'])['role_inference']
implies_ids = [role['id'] for role in rules['implies']]
self.assertEqual(2, len(implies_ids))
@@ -384,13 +397,13 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self.roles_client.delete_role,
domain_role['id'])
domain_roles = self.roles_client.list_roles(
domain_roles = self.reader_roles_client.list_roles(
domain_id=self.domain['id'])['roles']
self.assertEqual(1, len(domain_roles))
self.assertIn(domain_role, domain_roles)
self.roles_client.delete_role(domain_role['id'])
domain_roles = self.roles_client.list_roles(
domain_roles = self.reader_roles_client.list_roles(
domain_id=self.domain['id'])['roles']
self.assertEmpty(domain_roles)
@@ -465,7 +478,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self._create_implied_role(
self.roles[2]['id'], self.role['id'])
rules = self.roles_client.list_all_role_inference_rules()[
rules = self.reader_roles_client.list_all_role_inference_rules()[
'role_inferences']
# NOTE(jaosorior): With the work related to the define-default-roles

View File

@@ -25,11 +25,25 @@ CONF = config.CONF
class ServicesTestJSON(base.BaseIdentityV3AdminTest):
"""Test keystone services"""
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(ServicesTestJSON, cls).setup_clients()
if CONF.identity.use_system_token:
# Use system reader for listing/showing services
cls.reader_services_client = (
cls.os_system_reader.identity_services_v3_client)
else:
# Use admin client by default
cls.reader_services_client = cls.services_client
def _del_service(self, service_id):
# Used for deleting the services created in this class
self.services_client.delete_service(service_id)
# Checking whether service is deleted successfully
self.assertRaises(lib_exc.NotFound, self.services_client.show_service,
self.assertRaises(lib_exc.NotFound,
self.reader_services_client.show_service,
service_id)
@decorators.attr(type='smoke')
@@ -61,7 +75,8 @@ class ServicesTestJSON(base.BaseIdentityV3AdminTest):
self.assertNotEqual(resp1_desc, resp2_desc)
# Get service
fetched_service = self.services_client.show_service(s_id)['service']
fetched_service = self.reader_services_client.show_service(s_id)[
'service']
resp3_desc = fetched_service['description']
self.assertEqual(resp2_desc, resp3_desc)
@@ -100,14 +115,14 @@ class ServicesTestJSON(base.BaseIdentityV3AdminTest):
service_types.append(serv_type)
# List and Verify Services
services = self.services_client.list_services()['services']
services = self.reader_services_client.list_services()['services']
fetched_ids = [service['id'] for service in services]
found = [s for s in fetched_ids if s in service_ids]
self.assertEqual(len(found), len(service_ids))
# Check that filtering by service type works.
for serv_type in service_types:
fetched_services = self.services_client.list_services(
fetched_services = self.reader_services_client.list_services(
type=serv_type)['services']
self.assertEqual(1, len(fetched_services))
self.assertEqual(serv_type, fetched_services[0]['type'])

View File

@@ -29,6 +29,19 @@ CONF = config.CONF
class TrustsV3TestJSON(base.BaseIdentityV3AdminTest):
"""Test keystone trusts"""
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(TrustsV3TestJSON, cls).setup_clients()
if CONF.identity.use_system_token:
# Use system reader for listing trusts
cls.reader_trusts_client = (
cls.os_system_reader.trusts_client)
else:
# Use admin client by default
cls.reader_trusts_client = cls.trusts_client
@classmethod
def skip_checks(cls):
super(TrustsV3TestJSON, cls).skip_checks()
@@ -293,7 +306,7 @@ class TrustsV3TestJSON(base.BaseIdentityV3AdminTest):
original_scope = self.os_admin.auth_provider.scope
set_scope(self.os_admin.auth_provider, 'project')
self.addCleanup(set_scope, self.os_admin.auth_provider, original_scope)
trusts_get = self.trusts_client.list_trusts()['trusts']
trusts_get = self.reader_trusts_client.list_trusts()['trusts']
trusts = [t for t in trusts_get
if t['id'] == self.trust_id]
self.assertEqual(1, len(trusts))

View File

@@ -29,6 +29,27 @@ CONF = config.CONF
class UsersV3TestJSON(base.BaseIdentityV3AdminTest):
"""Test keystone users"""
credentials = ['primary', 'admin', 'system_reader']
@classmethod
def setup_clients(cls):
super(UsersV3TestJSON, cls).setup_clients()
if CONF.identity.use_system_token:
# Use system reader for listing/showing users
cls.reader_users_client = (
cls.os_system_reader.users_v3_client)
# Use system reader for showing roles
cls.reader_roles_client = (
cls.os_system_reader.roles_v3_client)
# Use system reader for showing projects
cls.reader_projects_client = (
cls.os_system_reader.projects_client)
else:
# Use admin client by default
cls.reader_users_client = cls.users_client
cls.reader_roles_client = cls.roles_client
cls.reader_projects_client = cls.projects_client
@classmethod
def skip_checks(cls):
super(UsersV3TestJSON, cls).skip_checks()
@@ -67,7 +88,7 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest):
self.assertEqual(update_kwargs[field], updated_user[field])
# GET by id after updating
new_user_get = self.users_client.show_user(user['id'])['user']
new_user_get = self.reader_users_client.show_user(user['id'])['user']
# Assert response body of GET after updation
for field in update_kwargs:
self.assertEqual(update_kwargs[field], new_user_get[field])
@@ -120,19 +141,20 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest):
# Creating Role
role_body = self.setup_test_role()
user = self.users_client.show_user(user_body['id'])['user']
role = self.roles_client.show_role(role_body['id'])['role']
user = self.reader_users_client.show_user(user_body['id'])['user']
role = self.reader_roles_client.show_role(role_body['id'])['role']
for _ in range(2):
# Creating project so as to assign role
project_body = self.setup_test_project()
project = self.projects_client.show_project(
project = self.reader_projects_client.show_project(
project_body['id'])['project']
# Assigning roles to user on project
self.roles_client.create_user_role_on_project(project['id'],
user['id'],
role['id'])
assigned_project_ids.append(project['id'])
body = self.users_client.list_user_projects(user['id'])['projects']
body = self.reader_users_client.list_user_projects(user['id'])[
'projects']
for i in body:
fetched_project_ids.append(i['id'])
# verifying the project ids in list
@@ -148,7 +170,7 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest):
def test_get_user(self):
"""Test getting a user detail"""
user = self.setup_test_user()
fetched_user = self.users_client.show_user(user['id'])['user']
fetched_user = self.reader_users_client.show_user(user['id'])['user']
self.assertEqual(user['id'], fetched_user['id'])
@testtools.skipUnless(CONF.identity_feature_enabled.security_compliance,