From 8732781c645b3dc5c14d91fa708f840613560bc5 Mon Sep 17 00:00:00 2001 From: Abhishek Bongale Date: Fri, 19 Dec 2025 12:31:49 +0000 Subject: [PATCH] Use system_reader in identity v3 admin tests Add RBAC support to identity v3 admin tests by introducing system_reader credentials for read operations (list/show) while retaining admin credentials for write operations (create/update/delete). This ensures proper scope enforcement when keystone enforce_scope is enabled. Implements-blueprint: test-keystone-with-srbac-defaults Change-Id: Idb6723dd42eeec68ef115d2d8860e36abf2c5276 Signed-off-by: Abhishek Bongale --- tempest/api/identity/admin/v3/test_domains.py | 37 +++++++++++--- .../api/identity/admin/v3/test_endpoints.py | 38 +++++++++----- tempest/api/identity/admin/v3/test_groups.py | 32 +++++++++--- .../identity/admin/v3/test_list_projects.py | 23 +++++++-- .../api/identity/admin/v3/test_list_users.py | 23 +++++++-- .../api/identity/admin/v3/test_policies.py | 17 ++++++- .../api/identity/admin/v3/test_projects.py | 51 ++++++++++++++----- tempest/api/identity/admin/v3/test_regions.py | 17 +++++-- tempest/api/identity/admin/v3/test_roles.py | 45 ++++++++++------ .../api/identity/admin/v3/test_services.py | 23 +++++++-- tempest/api/identity/admin/v3/test_trusts.py | 15 +++++- tempest/api/identity/admin/v3/test_users.py | 34 ++++++++++--- 12 files changed, 273 insertions(+), 82 deletions(-) diff --git a/tempest/api/identity/admin/v3/test_domains.py b/tempest/api/identity/admin/v3/test_domains.py index 80c4d1c9a3..7291a0bf8a 100644 --- a/tempest/api/identity/admin/v3/test_domains.py +++ b/tempest/api/identity/admin/v3/test_domains.py @@ -26,6 +26,27 @@ CONF = config.CONF class DomainsTestJSON(base.BaseIdentityV3AdminTest): """Test identity domains""" + credentials = ['primary', 'admin', 'system_reader'] + + @classmethod + def setup_clients(cls): + super(DomainsTestJSON, cls).setup_clients() + if CONF.identity.use_system_token: + # Use system reader for listing/showing domains + cls.reader_domains_client = ( + cls.os_system_reader.domains_client) + # Use system reader for showing users + cls.reader_users_client = ( + cls.os_system_reader.users_v3_client) + # Use system reader for showing groups + cls.reader_groups_client = ( + cls.os_system_reader.groups_client) + else: + # Use admin client by default + cls.reader_domains_client = cls.domains_client + cls.reader_users_client = cls.users_client + cls.reader_groups_client = cls.groups_client + @classmethod def resource_setup(cls): super(DomainsTestJSON, cls).resource_setup() @@ -41,7 +62,7 @@ class DomainsTestJSON(base.BaseIdentityV3AdminTest): """Test listing domains""" fetched_ids = list() # List and Verify Domains - body = self.domains_client.list_domains()['domains'] + body = self.reader_domains_client.list_domains()['domains'] for d in body: fetched_ids.append(d['id']) missing_doms = [d for d in self.setup_domains @@ -52,7 +73,7 @@ class DomainsTestJSON(base.BaseIdentityV3AdminTest): def test_list_domains_filter_by_name(self): """Test listing domains filtering by name""" params = {'name': self.setup_domains[0]['name']} - fetched_domains = self.domains_client.list_domains( + fetched_domains = self.reader_domains_client.list_domains( **params)['domains'] # Verify the filtered list is correct, domain names are unique # so exactly one domain should be found with the provided name @@ -64,7 +85,7 @@ class DomainsTestJSON(base.BaseIdentityV3AdminTest): def test_list_domains_filter_by_enabled(self): """Test listing domains filtering by enabled domains""" params = {'enabled': True} - fetched_domains = self.domains_client.list_domains( + fetched_domains = self.reader_domains_client.list_domains( **params)['domains'] # Verify the filtered list is correct self.assertIn(self.setup_domains[0], fetched_domains) @@ -108,14 +129,14 @@ class DomainsTestJSON(base.BaseIdentityV3AdminTest): self.assertEqual(new_desc, updated_domain['description']) self.assertEqual(False, updated_domain['enabled']) # Show domain - fetched_domain = self.domains_client.show_domain( + fetched_domain = self.reader_domains_client.show_domain( domain['id'])['domain'] self.assertEqual(new_name, fetched_domain['name']) self.assertEqual(new_desc, fetched_domain['description']) self.assertEqual(False, fetched_domain['enabled']) # Delete domain self.domains_client.delete_domain(domain['id']) - body = self.domains_client.list_domains()['domains'] + body = self.reader_domains_client.list_domains()['domains'] domains_list = [d['id'] for d in body] self.assertNotIn(domain['id'], domains_list) @@ -130,11 +151,11 @@ class DomainsTestJSON(base.BaseIdentityV3AdminTest): self.delete_domain(domain['id']) # Check the domain, its users and groups are gone self.assertRaises(exceptions.NotFound, - self.domains_client.show_domain, domain['id']) + self.reader_domains_client.show_domain, domain['id']) self.assertRaises(exceptions.NotFound, - self.users_client.show_user, user['id']) + self.reader_users_client.show_user, user['id']) self.assertRaises(exceptions.NotFound, - self.groups_client.show_group, group['id']) + self.reader_groups_client.show_group, group['id']) @decorators.idempotent_id('036df86e-bb5d-42c0-a7c2-66b9db3a6046') def test_create_domain_with_disabled_status(self): diff --git a/tempest/api/identity/admin/v3/test_endpoints.py b/tempest/api/identity/admin/v3/test_endpoints.py index f9f3e7296d..defdcc76c2 100644 --- a/tempest/api/identity/admin/v3/test_endpoints.py +++ b/tempest/api/identity/admin/v3/test_endpoints.py @@ -30,10 +30,21 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest): # pre-provisioned credentials provider. force_tenant_isolation = False + credentials = ['primary', 'admin', 'system_reader'] + @classmethod def setup_clients(cls): super(EndPointsTestJSON, cls).setup_clients() cls.client = cls.endpoints_client + if CONF.identity.use_system_token: + # Use system reader for listing/showing endpoints + cls.reader_client = cls.os_system_reader.endpoints_v3_client + # Use system reader for showing regions + cls.reader_regions_client = cls.os_system_reader.regions_client + else: + # Use admin client by default + cls.reader_client = cls.client + cls.reader_regions_client = cls.regions_client @classmethod def resource_setup(cls): @@ -55,7 +66,8 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest): endpoint = cls.client.create_endpoint( service_id=cls.service_ids[i], interface=interfaces[i], url=url, region=region_name, enabled=True)['endpoint'] - region = cls.regions_client.show_region(region_name)['region'] + region = cls.reader_regions_client.show_region(region_name)[ + 'region'] cls.addClassResourceCleanup( cls.regions_client.delete_region, region['id']) cls.addClassResourceCleanup( @@ -81,7 +93,7 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest): def test_list_endpoints(self): """Test listing keystone endpoints by filters""" # Get the list of all the endpoints. - fetched_endpoints = self.client.list_endpoints()['endpoints'] + fetched_endpoints = self.reader_client.list_endpoints()['endpoints'] fetched_endpoint_ids = [e['id'] for e in fetched_endpoints] # Check that all the created endpoints are present in # "fetched_endpoints". @@ -93,9 +105,9 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest): ', '.join(str(e) for e in missing_endpoints)) # Check that filtering endpoints by service_id works. - fetched_endpoints_for_service = self.client.list_endpoints( + fetched_endpoints_for_service = self.reader_client.list_endpoints( service_id=self.service_ids[0])['endpoints'] - fetched_endpoints_for_alt_service = self.client.list_endpoints( + fetched_endpoints_for_alt_service = self.reader_client.list_endpoints( service_id=self.service_ids[1])['endpoints'] # Assert that both filters returned the correct result. @@ -106,9 +118,9 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest): fetched_endpoints_for_alt_service[0]['id']])) # Check that filtering endpoints by interface works. - fetched_public_endpoints = self.client.list_endpoints( + fetched_public_endpoints = self.reader_client.list_endpoints( interface='public')['endpoints'] - fetched_internal_endpoints = self.client.list_endpoints( + fetched_internal_endpoints = self.reader_client.list_endpoints( interface='internal')['endpoints'] # Check that the expected endpoint_id is present per filter. [0] is @@ -129,7 +141,7 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest): interface=interface, url=url, region=region_name, enabled=True)['endpoint'] - region = self.regions_client.show_region(region_name)['region'] + region = self.reader_regions_client.show_region(region_name)['region'] self.addCleanup(self.regions_client.delete_region, region['id']) self.addCleanup(test_utils.call_and_ignore_notfound_exc, self.client.delete_endpoint, endpoint['id']) @@ -138,13 +150,13 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest): self.assertEqual(url, endpoint['url']) # Checking if created endpoint is present in the list of endpoints - fetched_endpoints = self.client.list_endpoints()['endpoints'] + fetched_endpoints = self.reader_client.list_endpoints()['endpoints'] fetched_endpoints_id = [e['id'] for e in fetched_endpoints] self.assertIn(endpoint['id'], fetched_endpoints_id) # Show endpoint fetched_endpoint = ( - self.client.show_endpoint(endpoint['id'])['endpoint']) + self.reader_client.show_endpoint(endpoint['id'])['endpoint']) # Asserting if the attributes of endpoint are the same self.assertEqual(self.service_ids[0], fetched_endpoint['service_id']) self.assertEqual(interface, fetched_endpoint['interface']) @@ -156,7 +168,7 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest): self.client.delete_endpoint(endpoint['id']) # Checking whether endpoint is deleted successfully - fetched_endpoints = self.client.list_endpoints()['endpoints'] + fetched_endpoints = self.reader_client.list_endpoints()['endpoints'] fetched_endpoints_id = [e['id'] for e in fetched_endpoints] self.assertNotIn(endpoint['id'], fetched_endpoints_id) @@ -187,7 +199,8 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest): interface=interface1, url=url1, region=region1_name, enabled=True)['endpoint']) - region1 = self.regions_client.show_region(region1_name)['region'] + region1 = self.reader_regions_client.show_region(region1_name)[ + 'region'] self.addCleanup(self.regions_client.delete_region, region1['id']) # Updating endpoint with new values @@ -199,7 +212,8 @@ class EndPointsTestJSON(base.BaseIdentityV3AdminTest): interface=interface2, url=url2, region=region2_name, enabled=False)['endpoint'] - region2 = self.regions_client.show_region(region2_name)['region'] + region2 = self.reader_regions_client.show_region(region2_name)[ + 'region'] self.addCleanup(self.regions_client.delete_region, region2['id']) self.addCleanup(self.client.delete_endpoint, endpoint_for_update['id']) diff --git a/tempest/api/identity/admin/v3/test_groups.py b/tempest/api/identity/admin/v3/test_groups.py index 96218bbf4a..f704f02d17 100644 --- a/tempest/api/identity/admin/v3/test_groups.py +++ b/tempest/api/identity/admin/v3/test_groups.py @@ -30,6 +30,23 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest): # pre-provisioned credentials provider. force_tenant_isolation = False + credentials = ['primary', 'admin', 'system_reader'] + + @classmethod + def setup_clients(cls): + super(GroupsV3TestJSON, cls).setup_clients() + if CONF.identity.use_system_token: + # Use system reader for listing/showing groups + cls.reader_groups_client = ( + cls.os_system_reader.groups_client) + # Use system reader for listing user groups + cls.reader_users_client = ( + cls.os_system_reader.users_v3_client) + else: + # Use admin client by default + cls.reader_groups_client = cls.groups_client + cls.reader_users_client = cls.users_client + @classmethod def resource_setup(cls): super(GroupsV3TestJSON, cls).resource_setup() @@ -60,7 +77,7 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest): self.assertEqual(updated_group['description'], first_desc_update) # Verify that the updated values are reflected after performing show. - new_group = self.groups_client.show_group(group['id'])['group'] + new_group = self.reader_groups_client.show_group(group['id'])['group'] self.assertEqual(group['id'], new_group['id']) self.assertEqual(first_name_update, new_group['name']) self.assertEqual(first_desc_update, new_group['description']) @@ -94,7 +111,8 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest): self.groups_client.add_group_user(group['id'], user['id']) # list users in group - group_users = self.groups_client.list_group_users(group['id'])['users'] + group_users = self.reader_groups_client.list_group_users(group['id'])[ + 'users'] self.assertEqual(sorted(users, key=lambda k: k['name']), sorted(group_users, key=lambda k: k['name'])) # check and delete user in group @@ -102,7 +120,8 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest): self.groups_client.check_group_user_existence( group['id'], user['id']) self.groups_client.delete_group_user(group['id'], user['id']) - group_users = self.groups_client.list_group_users(group['id'])['users'] + group_users = self.reader_groups_client.list_group_users(group['id'])[ + 'users'] self.assertEqual(len(group_users), 0) @decorators.idempotent_id('64573281-d26a-4a52-b899-503cb0f4e4ec') @@ -121,7 +140,8 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest): groups.append(group) self.groups_client.add_group_user(group['id'], user['id']) # list groups which user belongs to - user_groups = self.users_client.list_user_groups(user['id'])['groups'] + user_groups = self.reader_users_client.list_user_groups(user['id'])[ + 'groups'] # The `membership_expires_at` attribute is present when listing user # group memberships, and is not an attribute of the groups themselves. # Therefore we remove it from the comparison. @@ -146,10 +166,10 @@ class GroupsV3TestJSON(base.BaseIdentityV3AdminTest): # of listing all users and listing all groups are not supported, # they need a domain filter to be specified if CONF.identity_feature_enabled.domain_specific_drivers: - body = self.groups_client.list_groups( + body = self.reader_groups_client.list_groups( domain_id=self.domain['id'])['groups'] else: - body = self.groups_client.list_groups()['groups'] + body = self.reader_groups_client.list_groups()['groups'] for g in body: fetched_ids.append(g['id']) missing_groups = [g for g in group_ids if g not in fetched_ids] diff --git a/tempest/api/identity/admin/v3/test_list_projects.py b/tempest/api/identity/admin/v3/test_list_projects.py index 2135fccdd1..c758dfa614 100644 --- a/tempest/api/identity/admin/v3/test_list_projects.py +++ b/tempest/api/identity/admin/v3/test_list_projects.py @@ -26,13 +26,26 @@ CONF = config.CONF class BaseListProjectsTestJSON(base.BaseIdentityV3AdminTest): + credentials = ['primary', 'admin', 'system_reader'] + + @classmethod + def setup_clients(cls): + super(BaseListProjectsTestJSON, cls).setup_clients() + if CONF.identity.use_system_token: + # Use system reader for listing projects + cls.reader_projects_client = ( + cls.os_system_reader.projects_client) + else: + # Use admin client by default + cls.reader_projects_client = cls.projects_client + def _list_projects_with_params(self, included, excluded, params, key): # Validate that projects in ``included`` belongs to the projects # returned that match ``params`` but not projects in ``excluded`` - all_projects = self.projects_client.list_projects()['projects'] + all_projects = self.reader_projects_client.list_projects()['projects'] LOG.debug("Complete list of projects available in keystone: %s", all_projects) - body = self.projects_client.list_projects(params)['projects'] + body = self.reader_projects_client.list_projects(params)['projects'] for p in included: self.assertIn(p[key], map(lambda x: x[key], body)) for p in excluded: @@ -75,7 +88,7 @@ class ListProjectsTestJSON(BaseListProjectsTestJSON): def test_list_projects_with_parent(self): """Test listing projects with parent""" params = {'parent_id': self.p3['parent_id']} - fetched_projects = self.projects_client.list_projects( + fetched_projects = self.reader_projects_client.list_projects( params)['projects'] self.assertNotEmpty(fetched_projects) for project in fetched_projects: @@ -111,10 +124,10 @@ class ListProjectsStaticTestJSON(BaseListProjectsTestJSON): @decorators.idempotent_id('1d830662-22ad-427c-8c3e-4ec854b0af44') def test_list_projects(self): """Test listing projects""" - list_projects = self.projects_client.list_projects()['projects'] + list_projects = self.reader_projects_client.list_projects()['projects'] for p in [self.p1, self.p2]: - show_project = self.projects_client.show_project(p['id'])[ + show_project = self.reader_projects_client.show_project(p['id'])[ 'project'] self.assertIn(show_project, list_projects) diff --git a/tempest/api/identity/admin/v3/test_list_users.py b/tempest/api/identity/admin/v3/test_list_users.py index 3884989086..e8d0ff563c 100644 --- a/tempest/api/identity/admin/v3/test_list_users.py +++ b/tempest/api/identity/admin/v3/test_list_users.py @@ -24,12 +24,25 @@ CONF = config.CONF class UsersV3TestJSON(base.BaseIdentityV3AdminTest): """Test listing keystone users""" + credentials = ['primary', 'admin', 'system_reader'] + + @classmethod + def setup_clients(cls): + super(UsersV3TestJSON, cls).setup_clients() + if CONF.identity.use_system_token: + # Use system reader for listing users + cls.reader_users_client = ( + cls.os_system_reader.users_v3_client) + else: + # Use admin client by default + cls.reader_users_client = cls.users_client + def _list_users_with_params(self, params, key, expected, not_expected): # Helper method to list users filtered with params and # assert the response based on expected and not_expected # expected: user expected in the list response # not_expected: user, which should not be present in list response - body = self.users_client.list_users(**params)['users'] + body = self.reader_users_client.list_users(**params)['users'] self.assertIn(expected[key], map(lambda x: x[key], body)) self.assertNotIn(not_expected[key], map(lambda x: x[key], body)) @@ -105,13 +118,13 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest): # of listing all users and listing all groups are not supported, # they need a domain filter to be specified if CONF.identity_feature_enabled.domain_specific_drivers: - body_enabled_user = self.users_client.list_users( + body_enabled_user = self.reader_users_client.list_users( domain_id=self.domain_enabled_user['domain_id'])['users'] - body_non_enabled_user = self.users_client.list_users( + body_non_enabled_user = self.reader_users_client.list_users( domain_id=self.non_domain_enabled_user['domain_id'])['users'] body = (body_enabled_user + body_non_enabled_user) else: - body = self.users_client.list_users()['users'] + body = self.reader_users_client.list_users()['users'] fetched_ids = [u['id'] for u in body] missing_users = [u['id'] for u in self.users @@ -123,7 +136,7 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest): @decorators.idempotent_id('b4baa3ae-ac00-4b4e-9e27-80deaad7771f') def test_get_user(self): """Get a user detail""" - user = self.users_client.show_user(self.users[0]['id'])['user'] + user = self.reader_users_client.show_user(self.users[0]['id'])['user'] self.assertEqual(self.users[0]['id'], user['id']) self.assertEqual(self.users[0]['name'], user['name']) self.assertEqual(self.alt_email, user['email']) diff --git a/tempest/api/identity/admin/v3/test_policies.py b/tempest/api/identity/admin/v3/test_policies.py index 2d3775a48d..6bce53380b 100644 --- a/tempest/api/identity/admin/v3/test_policies.py +++ b/tempest/api/identity/admin/v3/test_policies.py @@ -24,6 +24,19 @@ CONF = config.CONF class PoliciesTestJSON(base.BaseIdentityV3AdminTest): """Test keystone policies""" + credentials = ['primary', 'admin', 'system_reader'] + + @classmethod + def setup_clients(cls): + super(PoliciesTestJSON, cls).setup_clients() + if CONF.identity.use_system_token: + # Use system reader for listing/showing policies + cls.reader_policies_client = ( + cls.os_system_reader.policies_client) + else: + # Use admin client by default + cls.reader_policies_client = cls.policies_client + def _delete_policy(self, policy_id): self.policies_client.delete_policy(policy_id) @@ -43,7 +56,7 @@ class PoliciesTestJSON(base.BaseIdentityV3AdminTest): self.addCleanup(self._delete_policy, policy['id']) policy_ids.append(policy['id']) # List and Verify Policies - body = self.policies_client.list_policies()['policies'] + body = self.reader_policies_client.list_policies()['policies'] for p in body: fetched_ids.append(p['id']) missing_pols = [p for p in policy_ids if p not in fetched_ids] @@ -70,7 +83,7 @@ class PoliciesTestJSON(base.BaseIdentityV3AdminTest): policy['id'], type=update_type)['policy'] self.assertIn('type', data) # Assertion for updated value with fetched value - fetched_policy = self.policies_client.show_policy( + fetched_policy = self.reader_policies_client.show_policy( policy['id'])['policy'] self.assertIn('id', fetched_policy) self.assertIn('blob', fetched_policy) diff --git a/tempest/api/identity/admin/v3/test_projects.py b/tempest/api/identity/admin/v3/test_projects.py index 3b0052c8c4..c191955c07 100644 --- a/tempest/api/identity/admin/v3/test_projects.py +++ b/tempest/api/identity/admin/v3/test_projects.py @@ -30,6 +30,27 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest): # pre-provisioned credentials provider. force_tenant_isolation = False + credentials = ['primary', 'admin', 'system_reader'] + + @classmethod + def setup_clients(cls): + super(ProjectsTestJSON, cls).setup_clients() + if CONF.identity.use_system_token: + # Use system reader for listing/showing projects + cls.reader_projects_client = ( + cls.os_system_reader.projects_client) + # Use system reader for listing/showing domains + cls.reader_domains_client = ( + cls.os_system_reader.domains_client) + # Use system reader for showing users + cls.reader_users_client = ( + cls.os_system_reader.users_v3_client) + else: + # Use admin client by default + cls.reader_projects_client = cls.projects_client + cls.reader_domains_client = cls.domains_client + cls.reader_users_client = cls.users_client + @decorators.idempotent_id('0ecf465c-0dc4-4532-ab53-91ffeb74d12d') def test_project_create_with_description(self): """Test creating project with a description""" @@ -40,7 +61,7 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest): desc1 = project['description'] self.assertEqual(desc1, project_desc, 'Description should have ' 'been sent in response for create') - body = self.projects_client.show_project(project_id)['project'] + body = self.reader_projects_client.show_project(project_id)['project'] desc2 = body['description'] self.assertEqual(desc2, project_desc, 'Description does not appear ' 'to be set') @@ -56,7 +77,7 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest): project_id = project['id'] self.assertEqual(project_name, project['name']) self.assertEqual(domain['id'], project['domain_id']) - body = self.projects_client.show_project(project_id)['project'] + body = self.reader_projects_client.show_project(project_id)['project'] self.assertEqual(project_name, body['name']) self.assertEqual(domain['id'], body['domain_id']) @@ -97,15 +118,15 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest): # Check if the is_domain project is correctly returned by both # project and domain APIs - projects_list = self.projects_client.list_projects( + projects_list = self.reader_projects_client.list_projects( params={'is_domain': True})['projects'] project_ids = [p['id'] for p in projects_list] self.assertIn(project['id'], project_ids) # The domains API return different attributes for the entity, so we # compare the entities IDs - domains_ids = [d['id'] for d in self.domains_client.list_domains()[ - 'domains']] + domains_list = self.reader_domains_client.list_domains()['domains'] + domains_ids = [d['id'] for d in domains_list] self.assertIn(project['id'], domains_ids) @decorators.idempotent_id('1f66dc76-50cc-4741-a200-af984509e480') @@ -115,7 +136,7 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest): project_id = project['id'] self.assertTrue(project['enabled'], 'Enable should be True in response') - body = self.projects_client.show_project(project_id)['project'] + body = self.reader_projects_client.show_project(project_id)['project'] self.assertTrue(body['enabled'], 'Enable should be True in lookup') @decorators.idempotent_id('78f96a9c-e0e0-4ee6-a3ba-fbf6dfd03207') @@ -124,7 +145,8 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest): project = self.setup_test_project(enabled=False) self.assertFalse(project['enabled'], 'Enable should be False in response') - body = self.projects_client.show_project(project['id'])['project'] + body = self.reader_projects_client.show_project(project['id'])[ + 'project'] self.assertFalse(body['enabled'], 'Enable should be False in lookup') @@ -144,7 +166,8 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest): resp2_name = body['name'] self.assertNotEqual(resp1_name, resp2_name) - body = self.projects_client.show_project(project['id'])['project'] + body = self.reader_projects_client.show_project(project['id'])[ + 'project'] resp3_name = body['name'] self.assertNotEqual(resp1_name, resp3_name) @@ -166,7 +189,8 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest): resp2_desc = body['description'] self.assertNotEqual(resp1_desc, resp2_desc) - body = self.projects_client.show_project(project['id'])['project'] + body = self.reader_projects_client.show_project(project['id'])[ + 'project'] resp3_desc = body['description'] self.assertNotEqual(resp1_desc, resp3_desc) @@ -187,7 +211,8 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest): resp2_en = body['enabled'] self.assertNotEqual(resp1_en, resp2_en) - body = self.projects_client.show_project(project['id'])['project'] + body = self.reader_projects_client.show_project(project['id'])[ + 'project'] resp3_en = body['enabled'] self.assertNotEqual(resp1_en, resp3_en) @@ -217,7 +242,7 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest): self.addCleanup(self.users_client.delete_user, user['id']) # Get User To validate the user details - new_user_get = self.users_client.show_user(user['id'])['user'] + new_user_get = self.reader_users_client.show_user(user['id'])['user'] # Assert response body of GET self.assertEqual(u_name, new_user_get['name']) self.assertEqual(u_desc, new_user_get['description']) @@ -238,9 +263,9 @@ class ProjectsTestJSON(base.BaseIdentityV3AdminTest): project = self.setup_test_project(tags=tags) # Show and list for the project - project_get = self.projects_client.show_project( + project_get = self.reader_projects_client.show_project( project['id'])['project'] - _projects = self.projects_client.list_projects()['projects'] + _projects = self.reader_projects_client.list_projects()['projects'] project_list = next(x for x in _projects if x['id'] == project['id']) # Assert the expected fields exist. More fields than expected may diff --git a/tempest/api/identity/admin/v3/test_regions.py b/tempest/api/identity/admin/v3/test_regions.py index 870a406f0c..f021cc2e74 100644 --- a/tempest/api/identity/admin/v3/test_regions.py +++ b/tempest/api/identity/admin/v3/test_regions.py @@ -30,10 +30,18 @@ class RegionsTestJSON(base.BaseIdentityV3AdminTest): # pre-provisioned credentials provider. force_tenant_isolation = False + credentials = ['primary', 'admin', 'system_reader'] + @classmethod def setup_clients(cls): super(RegionsTestJSON, cls).setup_clients() cls.client = cls.regions_client + if CONF.identity.use_system_token: + # Use system reader for listing/showing regions + cls.reader_client = cls.os_system_reader.regions_client + else: + # Use admin client by default + cls.reader_client = cls.client @classmethod def resource_setup(cls): @@ -77,13 +85,13 @@ class RegionsTestJSON(base.BaseIdentityV3AdminTest): self.assertEqual(self.setup_regions[1]['id'], region['parent_region_id']) # Get the details of region - region = self.client.show_region(region['id'])['region'] + region = self.reader_client.show_region(region['id'])['region'] self.assertEqual(r_alt_description, region['description']) self.assertEqual(self.setup_regions[1]['id'], region['parent_region_id']) # Delete the region self.client.delete_region(region['id']) - body = self.client.list_regions()['regions'] + body = self.reader_client.list_regions()['regions'] regions_list = [r['id'] for r in body] self.assertNotIn(region['id'], regions_list) @@ -104,7 +112,7 @@ class RegionsTestJSON(base.BaseIdentityV3AdminTest): @decorators.idempotent_id('d180bf99-544a-445c-ad0d-0c0d27663796') def test_list_regions(self): """Test getting a list of regions""" - fetched_regions = self.client.list_regions()['regions'] + fetched_regions = self.reader_client.list_regions()['regions'] missing_regions =\ [e for e in self.setup_regions if e not in fetched_regions] # Asserting List Regions response @@ -124,7 +132,8 @@ class RegionsTestJSON(base.BaseIdentityV3AdminTest): self.addCleanup(self.client.delete_region, region['id']) # Get the list of regions filtering with the parent_region_id params = {'parent_region_id': self.setup_regions[0]['id']} - fetched_regions = self.client.list_regions(params=params)['regions'] + fetched_regions = self.reader_client.list_regions(params=params)[ + 'regions'] # Asserting list regions response self.assertIn(region, fetched_regions) for r in fetched_regions: diff --git a/tempest/api/identity/admin/v3/test_roles.py b/tempest/api/identity/admin/v3/test_roles.py index ab96027858..d1c90dcf6c 100644 --- a/tempest/api/identity/admin/v3/test_roles.py +++ b/tempest/api/identity/admin/v3/test_roles.py @@ -32,6 +32,19 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): # pre-provisioned credentials provider. force_tenant_isolation = False + credentials = ['primary', 'admin', 'system_reader'] + + @classmethod + def setup_clients(cls): + super(RolesV3TestJSON, cls).setup_clients() + if CONF.identity.use_system_token: + # Use system reader for listing/showing roles + cls.reader_roles_client = ( + cls.os_system_reader.roles_v3_client) + else: + # Use admin client by default + cls.reader_roles_client = cls.roles_client + @classmethod def resource_setup(cls): super(RolesV3TestJSON, cls).resource_setup() @@ -97,11 +110,11 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): self.assertIn('links', updated_role) self.assertNotEqual(r_name, updated_role['name']) - new_role = self.roles_client.show_role(role['id'])['role'] + new_role = self.reader_roles_client.show_role(role['id'])['role'] self.assertEqual(new_name, new_role['name']) self.assertEqual(updated_role['id'], new_role['id']) - roles = self.roles_client.list_roles()['roles'] + roles = self.reader_roles_client.list_roles()['roles'] self.assertIn(role['id'], [r['id'] for r in roles]) @decorators.idempotent_id('c6b80012-fe4a-498b-9ce8-eb391c05169f') @@ -114,7 +127,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): self.user_body['id'], self.role['id']) - roles = self.roles_client.list_user_roles_on_project( + roles = self.reader_roles_client.list_user_roles_on_project( self.project['id'], self.user_body['id'])['roles'] self.assertEqual(1, len(roles)) @@ -135,7 +148,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): self.roles_client.create_user_role_on_domain( self.domain['id'], self.user_body['id'], self.role['id']) - roles = self.roles_client.list_user_roles_on_domain( + roles = self.reader_roles_client.list_user_roles_on_domain( self.domain['id'], self.user_body['id'])['roles'] self.assertEqual(1, len(roles)) @@ -155,7 +168,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): self.roles_client.create_user_role_on_system( self.user_body['id'], self.role['id']) - roles = self.roles_client.list_user_roles_on_system( + roles = self.reader_roles_client.list_user_roles_on_system( self.user_body['id'])['roles'] self.assertEqual(1, len(roles)) @@ -177,7 +190,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): self.roles_client.create_group_role_on_project( self.project['id'], self.group_body['id'], self.role['id']) # List group roles on project - roles = self.roles_client.list_group_roles_on_project( + roles = self.reader_roles_client.list_group_roles_on_project( self.project['id'], self.group_body['id'])['roles'] self.assertEqual(1, len(roles)) @@ -210,7 +223,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): self.roles_client.create_group_role_on_domain( self.domain['id'], self.group_body['id'], self.role['id']) - roles = self.roles_client.list_group_roles_on_domain( + roles = self.reader_roles_client.list_group_roles_on_domain( self.domain['id'], self.group_body['id'])['roles'] self.assertEqual(1, len(roles)) @@ -227,7 +240,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): self.roles_client.create_group_role_on_system( self.group_body['id'], self.role['id']) - roles = self.roles_client.list_group_roles_on_system( + roles = self.reader_roles_client.list_group_roles_on_system( self.group_body['id'])['roles'] self.assertEqual(1, len(roles)) @@ -243,7 +256,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): def test_list_roles(self): """Test listing roles""" # Return a list of all roles - body = self.roles_client.list_roles()['roles'] + body = self.reader_roles_client.list_roles()['roles'] found = [role for role in body if role in self.roles] self.assertEqual(len(found), len(self.roles)) @@ -278,7 +291,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): prior_role_id, implies_role_id) # Show the inference rule and check its elements - resp_body = self.roles_client.show_role_inference_rule( + resp_body = self.reader_roles_client.show_role_inference_rule( prior_role_id, implies_role_id) self.assertIn('role_inference', resp_body) role_inference = resp_body['role_inference'] @@ -293,7 +306,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): # Check if the inference rule no longer exists self.assertRaises( lib_exc.NotFound, - self.roles_client.show_role_inference_rule, + self.reader_roles_client.show_role_inference_rule, prior_role_id, implies_role_id) @@ -313,14 +326,14 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): self.roles[2]['id'], self.role['id']) # Listing inferences rules from "roles[2]" should only return "role" - rules = self.roles_client.list_role_inferences_rules( + rules = self.reader_roles_client.list_role_inferences_rules( self.roles[2]['id'])['role_inference'] self.assertEqual(1, len(rules['implies'])) self.assertEqual(self.role['id'], rules['implies'][0]['id']) # Listing inferences rules from "roles[0]" should return "roles[1]" and # "roles[2]" (only direct rules are listed) - rules = self.roles_client.list_role_inferences_rules( + rules = self.reader_roles_client.list_role_inferences_rules( self.roles[0]['id'])['role_inference'] implies_ids = [role['id'] for role in rules['implies']] self.assertEqual(2, len(implies_ids)) @@ -384,13 +397,13 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): self.roles_client.delete_role, domain_role['id']) - domain_roles = self.roles_client.list_roles( + domain_roles = self.reader_roles_client.list_roles( domain_id=self.domain['id'])['roles'] self.assertEqual(1, len(domain_roles)) self.assertIn(domain_role, domain_roles) self.roles_client.delete_role(domain_role['id']) - domain_roles = self.roles_client.list_roles( + domain_roles = self.reader_roles_client.list_roles( domain_id=self.domain['id'])['roles'] self.assertEmpty(domain_roles) @@ -465,7 +478,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest): self._create_implied_role( self.roles[2]['id'], self.role['id']) - rules = self.roles_client.list_all_role_inference_rules()[ + rules = self.reader_roles_client.list_all_role_inference_rules()[ 'role_inferences'] # NOTE(jaosorior): With the work related to the define-default-roles diff --git a/tempest/api/identity/admin/v3/test_services.py b/tempest/api/identity/admin/v3/test_services.py index b67e175256..3379c3ef16 100644 --- a/tempest/api/identity/admin/v3/test_services.py +++ b/tempest/api/identity/admin/v3/test_services.py @@ -25,11 +25,25 @@ CONF = config.CONF class ServicesTestJSON(base.BaseIdentityV3AdminTest): """Test keystone services""" + credentials = ['primary', 'admin', 'system_reader'] + + @classmethod + def setup_clients(cls): + super(ServicesTestJSON, cls).setup_clients() + if CONF.identity.use_system_token: + # Use system reader for listing/showing services + cls.reader_services_client = ( + cls.os_system_reader.identity_services_v3_client) + else: + # Use admin client by default + cls.reader_services_client = cls.services_client + def _del_service(self, service_id): # Used for deleting the services created in this class self.services_client.delete_service(service_id) # Checking whether service is deleted successfully - self.assertRaises(lib_exc.NotFound, self.services_client.show_service, + self.assertRaises(lib_exc.NotFound, + self.reader_services_client.show_service, service_id) @decorators.attr(type='smoke') @@ -61,7 +75,8 @@ class ServicesTestJSON(base.BaseIdentityV3AdminTest): self.assertNotEqual(resp1_desc, resp2_desc) # Get service - fetched_service = self.services_client.show_service(s_id)['service'] + fetched_service = self.reader_services_client.show_service(s_id)[ + 'service'] resp3_desc = fetched_service['description'] self.assertEqual(resp2_desc, resp3_desc) @@ -100,14 +115,14 @@ class ServicesTestJSON(base.BaseIdentityV3AdminTest): service_types.append(serv_type) # List and Verify Services - services = self.services_client.list_services()['services'] + services = self.reader_services_client.list_services()['services'] fetched_ids = [service['id'] for service in services] found = [s for s in fetched_ids if s in service_ids] self.assertEqual(len(found), len(service_ids)) # Check that filtering by service type works. for serv_type in service_types: - fetched_services = self.services_client.list_services( + fetched_services = self.reader_services_client.list_services( type=serv_type)['services'] self.assertEqual(1, len(fetched_services)) self.assertEqual(serv_type, fetched_services[0]['type']) diff --git a/tempest/api/identity/admin/v3/test_trusts.py b/tempest/api/identity/admin/v3/test_trusts.py index 5bd6756713..d843abf535 100644 --- a/tempest/api/identity/admin/v3/test_trusts.py +++ b/tempest/api/identity/admin/v3/test_trusts.py @@ -29,6 +29,19 @@ CONF = config.CONF class TrustsV3TestJSON(base.BaseIdentityV3AdminTest): """Test keystone trusts""" + credentials = ['primary', 'admin', 'system_reader'] + + @classmethod + def setup_clients(cls): + super(TrustsV3TestJSON, cls).setup_clients() + if CONF.identity.use_system_token: + # Use system reader for listing trusts + cls.reader_trusts_client = ( + cls.os_system_reader.trusts_client) + else: + # Use admin client by default + cls.reader_trusts_client = cls.trusts_client + @classmethod def skip_checks(cls): super(TrustsV3TestJSON, cls).skip_checks() @@ -293,7 +306,7 @@ class TrustsV3TestJSON(base.BaseIdentityV3AdminTest): original_scope = self.os_admin.auth_provider.scope set_scope(self.os_admin.auth_provider, 'project') self.addCleanup(set_scope, self.os_admin.auth_provider, original_scope) - trusts_get = self.trusts_client.list_trusts()['trusts'] + trusts_get = self.reader_trusts_client.list_trusts()['trusts'] trusts = [t for t in trusts_get if t['id'] == self.trust_id] self.assertEqual(1, len(trusts)) diff --git a/tempest/api/identity/admin/v3/test_users.py b/tempest/api/identity/admin/v3/test_users.py index 9bcbba5858..1272adb625 100644 --- a/tempest/api/identity/admin/v3/test_users.py +++ b/tempest/api/identity/admin/v3/test_users.py @@ -29,6 +29,27 @@ CONF = config.CONF class UsersV3TestJSON(base.BaseIdentityV3AdminTest): """Test keystone users""" + credentials = ['primary', 'admin', 'system_reader'] + + @classmethod + def setup_clients(cls): + super(UsersV3TestJSON, cls).setup_clients() + if CONF.identity.use_system_token: + # Use system reader for listing/showing users + cls.reader_users_client = ( + cls.os_system_reader.users_v3_client) + # Use system reader for showing roles + cls.reader_roles_client = ( + cls.os_system_reader.roles_v3_client) + # Use system reader for showing projects + cls.reader_projects_client = ( + cls.os_system_reader.projects_client) + else: + # Use admin client by default + cls.reader_users_client = cls.users_client + cls.reader_roles_client = cls.roles_client + cls.reader_projects_client = cls.projects_client + @classmethod def skip_checks(cls): super(UsersV3TestJSON, cls).skip_checks() @@ -67,7 +88,7 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest): self.assertEqual(update_kwargs[field], updated_user[field]) # GET by id after updating - new_user_get = self.users_client.show_user(user['id'])['user'] + new_user_get = self.reader_users_client.show_user(user['id'])['user'] # Assert response body of GET after updation for field in update_kwargs: self.assertEqual(update_kwargs[field], new_user_get[field]) @@ -120,19 +141,20 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest): # Creating Role role_body = self.setup_test_role() - user = self.users_client.show_user(user_body['id'])['user'] - role = self.roles_client.show_role(role_body['id'])['role'] + user = self.reader_users_client.show_user(user_body['id'])['user'] + role = self.reader_roles_client.show_role(role_body['id'])['role'] for _ in range(2): # Creating project so as to assign role project_body = self.setup_test_project() - project = self.projects_client.show_project( + project = self.reader_projects_client.show_project( project_body['id'])['project'] # Assigning roles to user on project self.roles_client.create_user_role_on_project(project['id'], user['id'], role['id']) assigned_project_ids.append(project['id']) - body = self.users_client.list_user_projects(user['id'])['projects'] + body = self.reader_users_client.list_user_projects(user['id'])[ + 'projects'] for i in body: fetched_project_ids.append(i['id']) # verifying the project ids in list @@ -148,7 +170,7 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest): def test_get_user(self): """Test getting a user detail""" user = self.setup_test_user() - fetched_user = self.users_client.show_user(user['id'])['user'] + fetched_user = self.reader_users_client.show_user(user['id'])['user'] self.assertEqual(user['id'], fetched_user['id']) @testtools.skipUnless(CONF.identity_feature_enabled.security_compliance,