Split roles_client for keystone v3 client

Partially implements blueprint consistent-service-method-names

Change-Id: Ib2b71d210f479f636e2998ae241117c8706286f3
This commit is contained in:
Arx Cruz 2016-02-10 15:20:16 +01:00 committed by Daniel Mellado
parent 74afe236d3
commit 24bcb88767
13 changed files with 264 additions and 247 deletions

View File

@ -71,8 +71,8 @@ class TestDefaultProjectId (base.BaseIdentityV3AdminTest):
admin_role_id = admin_role['id']
# grant the admin role to the user on his project
self.client.assign_user_role_on_project(proj_id, user_id,
admin_role_id)
self.roles_client.assign_user_role_on_project(proj_id, user_id,
admin_role_id)
# create a new client with user's credentials (NOTE: unscoped token!)
creds = auth.KeystoneV3Credentials(username=user_name,

View File

@ -25,7 +25,7 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
super(RolesV3TestJSON, cls).resource_setup()
for _ in range(3):
role_name = data_utils.rand_name(name='role')
role = cls.client.create_role(name=role_name)['role']
role = cls.roles_client.create_role(name=role_name)['role']
cls.data.roles.append(role)
cls.fetched_role_ids = list()
u_name = data_utils.rand_name('user')
@ -46,12 +46,12 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
u_name, description=u_desc, password=cls.u_password,
email=u_email, project_id=cls.project['id'],
domain_id=cls.domain['id'])['user']
cls.role = cls.client.create_role(
cls.role = cls.roles_client.create_role(
name=data_utils.rand_name('Role'))['role']
@classmethod
def resource_cleanup(cls):
cls.client.delete_role(cls.role['id'])
cls.roles_client.delete_role(cls.role['id'])
cls.groups_client.delete_group(cls.group_body['id'])
cls.users_client.delete_user(cls.user_body['id'])
cls.projects_client.delete_project(cls.project['id'])
@ -67,34 +67,35 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
@test.attr(type='smoke')
@test.idempotent_id('18afc6c0-46cf-4911-824e-9989cc056c3a')
def test_role_create_update_get_list(self):
def test_role_create_update_show_list(self):
r_name = data_utils.rand_name('Role')
role = self.client.create_role(name=r_name)['role']
self.addCleanup(self.client.delete_role, role['id'])
role = self.roles_client.create_role(name=r_name)['role']
self.addCleanup(self.roles_client.delete_role, role['id'])
self.assertIn('name', role)
self.assertEqual(role['name'], r_name)
new_name = data_utils.rand_name('NewRole')
updated_role = self.client.update_role(role['id'],
name=new_name)['role']
updated_role = self.roles_client.update_role(role['id'],
name=new_name)['role']
self.assertIn('name', updated_role)
self.assertIn('id', updated_role)
self.assertIn('links', updated_role)
self.assertNotEqual(r_name, updated_role['name'])
new_role = self.client.show_role(role['id'])['role']
new_role = self.roles_client.show_role(role['id'])['role']
self.assertEqual(new_name, new_role['name'])
self.assertEqual(updated_role['id'], new_role['id'])
roles = self.client.list_roles()['roles']
roles = self.roles_client.list_roles()['roles']
self.assertIn(role['id'], [r['id'] for r in roles])
@test.idempotent_id('c6b80012-fe4a-498b-9ce8-eb391c05169f')
def test_grant_list_revoke_role_to_user_on_project(self):
self.client.assign_user_role_on_project(
self.project['id'], self.user_body['id'], self.role['id'])
self.roles_client.assign_user_role_on_project(self.project['id'],
self.user_body['id'],
self.role['id'])
roles = self.client.list_user_roles_on_project(
roles = self.roles_client.list_user_roles_on_project(
self.project['id'], self.user_body['id'])['roles']
for i in roles:
@ -103,18 +104,18 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self._list_assertions(roles, self.fetched_role_ids,
self.role['id'])
self.client.check_user_role_existence_on_project(
self.roles_client.check_user_role_existence_on_project(
self.project['id'], self.user_body['id'], self.role['id'])
self.client.delete_role_from_user_on_project(
self.roles_client.delete_role_from_user_on_project(
self.project['id'], self.user_body['id'], self.role['id'])
@test.idempotent_id('6c9a2940-3625-43a3-ac02-5dcec62ef3bd')
def test_grant_list_revoke_role_to_user_on_domain(self):
self.client.assign_user_role_on_domain(
self.roles_client.assign_user_role_on_domain(
self.domain['id'], self.user_body['id'], self.role['id'])
roles = self.client.list_user_roles_on_domain(
roles = self.roles_client.list_user_roles_on_domain(
self.domain['id'], self.user_body['id'])['roles']
for i in roles:
@ -123,19 +124,19 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self._list_assertions(roles, self.fetched_role_ids,
self.role['id'])
self.client.check_user_role_existence_on_domain(
self.roles_client.check_user_role_existence_on_domain(
self.domain['id'], self.user_body['id'], self.role['id'])
self.client.delete_role_from_user_on_domain(
self.roles_client.delete_role_from_user_on_domain(
self.domain['id'], self.user_body['id'], self.role['id'])
@test.idempotent_id('cbf11737-1904-4690-9613-97bcbb3df1c4')
def test_grant_list_revoke_role_to_group_on_project(self):
# Grant role to group on project
self.client.assign_group_role_on_project(
self.roles_client.assign_group_role_on_project(
self.project['id'], self.group_body['id'], self.role['id'])
# List group roles on project
roles = self.client.list_group_roles_on_project(
roles = self.roles_client.list_group_roles_on_project(
self.project['id'], self.group_body['id'])['roles']
for i in roles:
@ -157,19 +158,19 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self.assertEqual(len(roles), 1)
self.assertEqual(roles[0]['id'], self.role['id'])
self.client.check_role_from_group_on_project_existence(
self.roles_client.check_role_from_group_on_project_existence(
self.project['id'], self.group_body['id'], self.role['id'])
# Revoke role to group on project
self.client.delete_role_from_group_on_project(
self.roles_client.delete_role_from_group_on_project(
self.project['id'], self.group_body['id'], self.role['id'])
@test.idempotent_id('4bf8a70b-e785-413a-ad53-9f91ce02faa7')
def test_grant_list_revoke_role_to_group_on_domain(self):
self.client.assign_group_role_on_domain(
self.roles_client.assign_group_role_on_domain(
self.domain['id'], self.group_body['id'], self.role['id'])
roles = self.client.list_group_roles_on_domain(
roles = self.roles_client.list_group_roles_on_domain(
self.domain['id'], self.group_body['id'])['roles']
for i in roles:
@ -178,15 +179,15 @@ class RolesV3TestJSON(base.BaseIdentityV3AdminTest):
self._list_assertions(roles, self.fetched_role_ids,
self.role['id'])
self.client.check_role_from_group_on_domain_existence(
self.roles_client.check_role_from_group_on_domain_existence(
self.domain['id'], self.group_body['id'], self.role['id'])
self.client.delete_role_from_group_on_domain(
self.roles_client.delete_role_from_group_on_domain(
self.domain['id'], self.group_body['id'], self.role['id'])
@test.idempotent_id('f5654bcc-08c4-4f71-88fe-05d64e06de94')
def test_list_roles(self):
# Return a list of all roles
body = self.client.list_roles()['roles']
body = self.roles_client.list_roles()['roles']
found = [role for role in body if role in self.data.roles]
self.assertEqual(len(found), len(self.data.roles))

View File

@ -77,15 +77,17 @@ class TokensV3TestJSON(base.BaseIdentityV3AdminTest):
# Create a role
role_name = data_utils.rand_name(name='role')
role = self.client.create_role(name=role_name)['role']
self.addCleanup(self.client.delete_role, role['id'])
role = self.roles_client.create_role(name=role_name)['role']
self.addCleanup(self.roles_client.delete_role, role['id'])
# Grant the user the role on both projects.
self.client.assign_user_role_on_project(project1['id'], user['id'],
role['id'])
self.roles_client.assign_user_role_on_project(project1['id'],
user['id'],
role['id'])
self.client.assign_user_role_on_project(project2['id'], user['id'],
role['id'])
self.roles_client.assign_user_role_on_project(project2['id'],
user['id'],
role['id'])
# Get an unscoped token.
token_auth = self.token.auth(user_id=user['id'],

View File

@ -69,19 +69,22 @@ class BaseTrustsV3Test(base.BaseIdentityV3AdminTest):
self.delegated_role = data_utils.rand_name('DelegatedRole')
self.not_delegated_role = data_utils.rand_name('NotDelegatedRole')
role = self.client.create_role(name=self.delegated_role)['role']
role = self.roles_client.create_role(name=self.delegated_role)['role']
self.delegated_role_id = role['id']
role = self.client.create_role(name=self.not_delegated_role)['role']
role = self.roles_client.create_role(
name=self.not_delegated_role)['role']
self.not_delegated_role_id = role['id']
# Assign roles to trustor
self.client.assign_user_role_on_project(self.trustor_project_id,
self.trustor_user_id,
self.delegated_role_id)
self.client.assign_user_role_on_project(self.trustor_project_id,
self.trustor_user_id,
self.not_delegated_role_id)
self.roles_client.assign_user_role_on_project(
self.trustor_project_id,
self.trustor_user_id,
self.delegated_role_id)
self.roles_client.assign_user_role_on_project(
self.trustor_project_id,
self.trustor_user_id,
self.not_delegated_role_id)
# Get trustee user ID, use the demo user
trustee_username = self.non_admin_client.user
@ -105,9 +108,9 @@ class BaseTrustsV3Test(base.BaseIdentityV3AdminTest):
if self.trustor_project_id:
self.projects_client.delete_project(self.trustor_project_id)
if self.delegated_role_id:
self.client.delete_role(self.delegated_role_id)
self.roles_client.delete_role(self.delegated_role_id)
if self.not_delegated_role_id:
self.client.delete_role(self.not_delegated_role_id)
self.roles_client.delete_role(self.not_delegated_role_id)
def create_trust(self, impersonate=True, expires=None):

View File

@ -117,13 +117,13 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest):
# Delete the User at the end of this method
self.addCleanup(self.users_client.delete_user, user_body['id'])
# Creating Role
role_body = self.client.create_role(
role_body = self.roles_client.create_role(
name=data_utils.rand_name('role'))['role']
# Delete the Role at the end of this method
self.addCleanup(self.client.delete_role, role_body['id'])
self.addCleanup(self.roles_client.delete_role, role_body['id'])
user = self.users_client.show_user(user_body['id'])['user']
role = self.client.show_role(role_body['id'])['role']
role = self.roles_client.show_role(role_body['id'])['role']
for i in range(2):
# Creating project so as to assign role
project_body = self.projects_client.create_project(
@ -135,9 +135,9 @@ class UsersV3TestJSON(base.BaseIdentityV3AdminTest):
self.addCleanup(
self.projects_client.delete_project, project_body['id'])
# Assigning roles to user on project
self.client.assign_user_role_on_project(project['id'],
user['id'],
role['id'])
self.roles_client.assign_user_role_on_project(project['id'],
user['id'],
role['id'])
assigned_project_ids.append(project['id'])
body = self.users_client.list_user_projects(user['id'])['projects']
for i in body:

View File

@ -112,8 +112,8 @@ class BaseIdentityV2AdminTest(BaseIdentityV2Test):
@classmethod
def resource_setup(cls):
super(BaseIdentityV2AdminTest, cls).resource_setup()
cls.data = DataGeneratorV2(cls.client, cls.tenants_client,
cls.users_client, cls.roles_client)
cls.data = DataGeneratorV2(cls.tenants_client, cls.users_client,
cls.roles_client)
@classmethod
def resource_cleanup(cls):
@ -153,6 +153,7 @@ class BaseIdentityV3AdminTest(BaseIdentityV3Test):
cls.domains_client = cls.os_adm.domains_client
cls.users_client = cls.os_adm.users_v3_client
cls.trusts_client = cls.os_adm.trusts_client
cls.roles_client = cls.os_adm.roles_v3_client
cls.token = cls.os_adm.token_v3_client
cls.endpoints_client = cls.os_adm.endpoints_client
cls.regions_client = cls.os_adm.regions_client
@ -165,21 +166,14 @@ class BaseIdentityV3AdminTest(BaseIdentityV3Test):
@classmethod
def resource_setup(cls):
super(BaseIdentityV3AdminTest, cls).resource_setup()
cls.data = DataGeneratorV3(cls.client, cls.projects_client,
cls.users_client, None, cls.domains_client)
cls.data = DataGeneratorV3(cls.projects_client, cls.users_client,
cls.roles_client, cls.domains_client)
@classmethod
def resource_cleanup(cls):
cls.data.teardown_all()
super(BaseIdentityV3AdminTest, cls).resource_cleanup()
@classmethod
def get_role_by_name(cls, name):
roles = cls.client.list_roles()['roles']
role = [r for r in roles if r['name'] == name]
if len(role) > 0:
return role[0]
@classmethod
def disable_user(cls, user_name, domain_id=None):
user = cls.get_user_by_name(user_name, domain_id)
@ -194,12 +188,11 @@ class BaseIdentityV3AdminTest(BaseIdentityV3Test):
class BaseDataGenerator(object):
def __init__(self, client, projects_client,
users_client, roles_client=None, domains_client=None):
self.client = client
def __init__(self, projects_client, users_client, roles_client,
domains_client=None):
self.projects_client = projects_client
self.users_client = users_client
self.roles_client = roles_client or client
self.roles_client = roles_client
self.domains_client = domains_client
self.user_password = None

View File

@ -128,6 +128,8 @@ from tempest.services.identity.v3.json.policies_client import \
from tempest.services.identity.v3.json.projects_client import ProjectsClient
from tempest.services.identity.v3.json.regions_client import \
RegionsClient as RegionsV3Client
from tempest.services.identity.v3.json.roles_client import \
RolesClient as RolesV3Client
from tempest.services.identity.v3.json.services_client import \
ServicesClient as IdentityServicesV3Client
from tempest.services.identity.v3.json.trusts_client import TrustsClient
@ -519,6 +521,7 @@ class Manager(manager.Manager):
self.users_v3_client = UsersV3Client(self.auth_provider, **params_v3)
self.endpoints_client = EndPointV3Client(self.auth_provider,
**params_v3)
self.roles_v3_client = RolesV3Client(self.auth_provider, **params_v3)
self.identity_services_client = IdentityServicesV3Client(
self.auth_provider, **params_v3)
self.policies_client = PoliciesV3Client(self.auth_provider,

View File

@ -32,14 +32,12 @@ class CredsClient(object):
"""
def __init__(self, identity_client, projects_client, users_client,
roles_client=None):
roles_client):
# The client implies version and credentials
self.identity_client = identity_client
self.users_client = users_client
self.projects_client = projects_client
# this is temporary until the v3 clients are
# separated, then using *only* each client will become mandatory
self.roles_client = roles_client or identity_client
self.roles_client = roles_client
def create_user(self, username, password, project, email):
user = self.users_client.create_user(
@ -130,9 +128,9 @@ class V2CredsClient(CredsClient):
class V3CredsClient(CredsClient):
def __init__(self, identity_client, projects_client, users_client,
domains_client, domain_name):
roles_client, domains_client, domain_name):
super(V3CredsClient, self).__init__(identity_client, projects_client,
users_client)
users_client, roles_client)
self.domains_client = domains_client
try:
@ -167,10 +165,6 @@ class V3CredsClient(CredsClient):
project_domain_id=self.creds_domain['id'],
project_domain_name=self.creds_domain['name'])
def _list_roles(self):
roles = self.identity_client.list_roles()['roles']
return roles
def _assign_user_role(self, project, user, role):
self.roles_client.assign_user_role_on_project(project['id'],
user['id'],
@ -180,7 +174,7 @@ class V3CredsClient(CredsClient):
def get_creds_client(identity_client,
projects_client,
users_client,
roles_client=None,
roles_client,
domains_client=None,
project_domain_name=None):
if isinstance(identity_client, v2_identity.IdentityClient):
@ -188,4 +182,4 @@ def get_creds_client(identity_client,
roles_client)
else:
return V3CredsClient(identity_client, projects_client, users_client,
domains_client, project_domain_name)
roles_client, domains_client, project_domain_name)

View File

@ -97,7 +97,7 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
os.security_groups_client)
else:
return (os.identity_v3_client, os.projects_client,
os.users_v3_client, None, os.domains_client,
os.users_v3_client, os.roles_v3_client, os.domains_client,
os.network_client, os.networks_client, os.subnets_client,
os.ports_client, os.security_groups_client)

View File

@ -29,50 +29,6 @@ class IdentityV3Client(service_client.ServiceClient):
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def create_role(self, **kwargs):
"""Create a Role.
Available params: see http://developer.openstack.org/
api-ref-identity-v3.html#createRole
"""
post_body = json.dumps({'role': kwargs})
resp, body = self.post('roles', post_body)
self.expected_success(201, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def show_role(self, role_id):
"""GET a Role."""
resp, body = self.get('roles/%s' % str(role_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def list_roles(self):
"""Get the list of Roles."""
resp, body = self.get("roles")
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def update_role(self, role_id, **kwargs):
"""Update a Role.
Available params: see http://developer.openstack.org/
api-ref-identity-v3.html#updateRole
"""
post_body = json.dumps({'role': kwargs})
resp, body = self.patch('roles/%s' % str(role_id), post_body)
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def delete_role(self, role_id):
"""Delete a role."""
resp, body = self.delete('roles/%s' % str(role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def show_token(self, resp_token):
"""Get token details."""
headers = {'X-Subject-Token': resp_token}
@ -87,123 +43,3 @@ class IdentityV3Client(service_client.ServiceClient):
resp, body = self.delete("auth/tokens", headers=headers)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def assign_user_role_on_project(self, project_id, user_id, role_id):
"""Add roles to a user on a project."""
resp, body = self.put('projects/%s/users/%s/roles/%s' %
(project_id, user_id, role_id), None)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def assign_user_role_on_domain(self, domain_id, user_id, role_id):
"""Add roles to a user on a domain."""
resp, body = self.put('domains/%s/users/%s/roles/%s' %
(domain_id, user_id, role_id), None)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_user_roles_on_project(self, project_id, user_id):
"""list roles of a user on a project."""
resp, body = self.get('projects/%s/users/%s/roles' %
(project_id, user_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def list_user_roles_on_domain(self, domain_id, user_id):
"""list roles of a user on a domain."""
resp, body = self.get('domains/%s/users/%s/roles' %
(domain_id, user_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def delete_role_from_user_on_project(self, project_id, user_id, role_id):
"""Delete role of a user on a project."""
resp, body = self.delete('projects/%s/users/%s/roles/%s' %
(project_id, user_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def delete_role_from_user_on_domain(self, domain_id, user_id, role_id):
"""Delete role of a user on a domain."""
resp, body = self.delete('domains/%s/users/%s/roles/%s' %
(domain_id, user_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def check_user_role_existence_on_project(self, project_id,
user_id, role_id):
"""Check role of a user on a project."""
resp, body = self.head('projects/%s/users/%s/roles/%s' %
(project_id, user_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp)
def check_user_role_existence_on_domain(self, domain_id,
user_id, role_id):
"""Check role of a user on a domain."""
resp, body = self.head('domains/%s/users/%s/roles/%s' %
(domain_id, user_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp)
def assign_group_role_on_project(self, project_id, group_id, role_id):
"""Add roles to a user on a project."""
resp, body = self.put('projects/%s/groups/%s/roles/%s' %
(project_id, group_id, role_id), None)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def assign_group_role_on_domain(self, domain_id, group_id, role_id):
"""Add roles to a user on a domain."""
resp, body = self.put('domains/%s/groups/%s/roles/%s' %
(domain_id, group_id, role_id), None)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_group_roles_on_project(self, project_id, group_id):
"""list roles of a user on a project."""
resp, body = self.get('projects/%s/groups/%s/roles' %
(project_id, group_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def list_group_roles_on_domain(self, domain_id, group_id):
"""list roles of a user on a domain."""
resp, body = self.get('domains/%s/groups/%s/roles' %
(domain_id, group_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def delete_role_from_group_on_project(self, project_id, group_id, role_id):
"""Delete role of a user on a project."""
resp, body = self.delete('projects/%s/groups/%s/roles/%s' %
(project_id, group_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def delete_role_from_group_on_domain(self, domain_id, group_id, role_id):
"""Delete role of a user on a domain."""
resp, body = self.delete('domains/%s/groups/%s/roles/%s' %
(domain_id, group_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def check_role_from_group_on_project_existence(self, project_id,
group_id, role_id):
"""Check role of a user on a project."""
resp, body = self.head('projects/%s/groups/%s/roles/%s' %
(project_id, group_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp)
def check_role_from_group_on_domain_existence(self, domain_id,
group_id, role_id):
"""Check role of a user on a domain."""
resp, body = self.head('domains/%s/groups/%s/roles/%s' %
(domain_id, group_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp)

View File

@ -0,0 +1,185 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_serialization import jsonutils as json
from tempest.common import service_client
class RolesClient(service_client.ServiceClient):
api_version = "v3"
def create_role(self, **kwargs):
"""Create a Role.
Available params: see http://developer.openstack.org/
api-ref-identity-v3.html#createRole
"""
post_body = json.dumps({'role': kwargs})
resp, body = self.post('roles', post_body)
self.expected_success(201, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def show_role(self, role_id):
"""GET a Role."""
resp, body = self.get('roles/%s' % str(role_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def list_roles(self):
"""Get the list of Roles."""
resp, body = self.get("roles")
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def update_role(self, role_id, **kwargs):
"""Update a Role.
Available params: see http://developer.openstack.org/
api-ref-identity-v3.html#updateRole
"""
post_body = json.dumps({'role': kwargs})
resp, body = self.patch('roles/%s' % str(role_id), post_body)
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def delete_role(self, role_id):
"""Delete a role."""
resp, body = self.delete('roles/%s' % str(role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def assign_user_role_on_project(self, project_id, user_id, role_id):
"""Add roles to a user on a project."""
resp, body = self.put('projects/%s/users/%s/roles/%s' %
(project_id, user_id, role_id), None)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def assign_user_role_on_domain(self, domain_id, user_id, role_id):
"""Add roles to a user on a domain."""
resp, body = self.put('domains/%s/users/%s/roles/%s' %
(domain_id, user_id, role_id), None)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_user_roles_on_project(self, project_id, user_id):
"""list roles of a user on a project."""
resp, body = self.get('projects/%s/users/%s/roles' %
(project_id, user_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def list_user_roles_on_domain(self, domain_id, user_id):
"""list roles of a user on a domain."""
resp, body = self.get('domains/%s/users/%s/roles' %
(domain_id, user_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def delete_role_from_user_on_project(self, project_id, user_id, role_id):
"""Delete role of a user on a project."""
resp, body = self.delete('projects/%s/users/%s/roles/%s' %
(project_id, user_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def delete_role_from_user_on_domain(self, domain_id, user_id, role_id):
"""Delete role of a user on a domain."""
resp, body = self.delete('domains/%s/users/%s/roles/%s' %
(domain_id, user_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def check_user_role_existence_on_project(self, project_id,
user_id, role_id):
"""Check role of a user on a project."""
resp, body = self.head('projects/%s/users/%s/roles/%s' %
(project_id, user_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp)
def check_user_role_existence_on_domain(self, domain_id,
user_id, role_id):
"""Check role of a user on a domain."""
resp, body = self.head('domains/%s/users/%s/roles/%s' %
(domain_id, user_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp)
def assign_group_role_on_project(self, project_id, group_id, role_id):
"""Add roles to a user on a project."""
resp, body = self.put('projects/%s/groups/%s/roles/%s' %
(project_id, group_id, role_id), None)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def assign_group_role_on_domain(self, domain_id, group_id, role_id):
"""Add roles to a user on a domain."""
resp, body = self.put('domains/%s/groups/%s/roles/%s' %
(domain_id, group_id, role_id), None)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_group_roles_on_project(self, project_id, group_id):
"""list roles of a user on a project."""
resp, body = self.get('projects/%s/groups/%s/roles' %
(project_id, group_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def list_group_roles_on_domain(self, domain_id, group_id):
"""list roles of a user on a domain."""
resp, body = self.get('domains/%s/groups/%s/roles' %
(domain_id, group_id))
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def delete_role_from_group_on_project(self, project_id, group_id, role_id):
"""Delete role of a user on a project."""
resp, body = self.delete('projects/%s/groups/%s/roles/%s' %
(project_id, group_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def delete_role_from_group_on_domain(self, domain_id, group_id, role_id):
"""Delete role of a user on a domain."""
resp, body = self.delete('domains/%s/groups/%s/roles/%s' %
(domain_id, group_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def check_role_from_group_on_project_existence(self, project_id,
group_id, role_id):
"""Check role of a user on a project."""
resp, body = self.head('projects/%s/groups/%s/roles/%s' %
(project_id, group_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp)
def check_role_from_group_on_domain_existence(self, domain_id,
group_id, role_id):
"""Check role of a user on a domain."""
resp, body = self.head('domains/%s/groups/%s/roles/%s' %
(domain_id, group_id, role_id))
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp)

View File

@ -153,7 +153,7 @@ def stress_openstack(tests, duration, max_runs=None, stop_on_error=False):
else:
identity_client = admin_manager.identity_v3_client
projects_client = admin_manager.projects_client
roles_client = None
roles_client = admin_manager.roles_v3_client
users_client = admin_manager.users_v3_client
domains_client = admin_manager.domains_client
domain = (identity_client.auth_provider.credentials.

View File

@ -445,9 +445,9 @@ class BaseTestCase(testtools.testcase.WithAttributes,
domains_client = None
else:
client = self.os_admin.identity_v3_client
project_client = self.os_adm.projects_client
users_client = self.os_admin.users_v3_client
roles_client = None
project_client = self.os_admin.projects_client
roles_client = self.os_admin.roles_v3_client
domains_client = self.os_admin.domains_client
try: