Add /role_assignments endpoint support

This patch adds role assignments list support
to keystoneclient.

Created RoleAssignment resource and RoleAssignmentManager
classes. RoleAssignmentManager only implements the list()
method, the other inherited methods from base.CrudManager
raises a MethodNotImplemented error with customized messages.

This bp is complimented with the OSC part:
https://blueprints.launchpad.net/python-openstackclient/+spec/roles-assignment-list

Change-Id: I164b58b67ff42320238e943ddfa9d0a8aadd0a6d
Implements: blueprint roles-assignment-support
Closes-Bug: #1246310
This commit is contained in:
Rodrigo Duarte Sousa
2014-05-01 11:07:59 -03:00
committed by Rodrigo Duarte Sousa
parent f06efe9c4e
commit 0b0d2d3a9a
5 changed files with 343 additions and 0 deletions

View File

@@ -62,6 +62,10 @@ class VersionNotAvailable(DiscoveryFailure):
"""Discovery failed as the version you requested is not available."""
class MethodNotImplemented(ClientException):
"""Method not implemented by the keystoneclient API."""
class MissingAuthPlugin(ClientException):
"""An authenticated request is required but no plugin available."""

View File

@@ -22,6 +22,7 @@ import requests
import six
from six.moves.urllib import parse as urlparse
import testtools
import uuid
from keystoneclient.openstack.common import jsonutils
@@ -29,11 +30,14 @@ from keystoneclient.openstack.common import jsonutils
class TestCase(testtools.TestCase):
TEST_DOMAIN_ID = '1'
TEST_DOMAIN_NAME = 'aDomain'
TEST_GROUP_ID = uuid.uuid4().hex
TEST_ROLE_ID = uuid.uuid4().hex
TEST_TENANT_ID = '1'
TEST_TENANT_NAME = 'aTenant'
TEST_TOKEN = 'aToken'
TEST_TRUST_ID = 'aTrust'
TEST_USER = 'test'
TEST_USER_ID = uuid.uuid4().hex
TEST_ROOT_URL = 'http://127.0.0.1:5000/'
@@ -63,6 +67,8 @@ class TestCase(testtools.TestCase):
else:
url = base_url
# For urls containing queries
url = url.replace("/?", "?")
httpretty.register_uri(method, url, **kwargs)
def assertRequestBodyIs(self, body=None, json=None):

View File

@@ -0,0 +1,220 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httpretty
from keystoneclient import exceptions
from keystoneclient.tests.v3 import utils
from keystoneclient.v3 import role_assignments
class RoleAssignmentsTests(utils.TestCase, utils.CrudTests):
def setUp(self):
super(RoleAssignmentsTests, self).setUp()
self.key = 'role_assignment'
self.collection_key = 'role_assignments'
self.model = role_assignments.RoleAssignment
self.manager = self.client.role_assignments
self.TEST_USER_DOMAIN_LIST = [{
'role': {
'id': self.TEST_ROLE_ID
},
'scope': {
'domain': {
'id': self.TEST_DOMAIN_ID
}
},
'user': {
'id': self.TEST_USER_ID
}
}]
self.TEST_GROUP_PROJECT_LIST = [{
'group': {
'id': self.TEST_GROUP_ID
},
'role': {
'id': self.TEST_ROLE_ID
},
'scope': {
'project': {
'id': self.TEST_TENANT_ID
}
}
}]
self.TEST_USER_PROJECT_LIST = [{
'user': {
'id': self.TEST_USER_ID
},
'role': {
'id': self.TEST_ROLE_ID
},
'scope': {
'project': {
'id': self.TEST_TENANT_ID
}
}
}]
self.TEST_ALL_RESPONSE_LIST = (self.TEST_USER_PROJECT_LIST +
self.TEST_GROUP_PROJECT_LIST +
self.TEST_USER_DOMAIN_LIST)
def _assert_returned_list(self, ref_list, returned_list):
self.assertEqual(len(ref_list), len(returned_list))
[self.assertIsInstance(r, self.model) for r in returned_list]
@httpretty.activate
def test_list_params(self):
ref_list = self.TEST_USER_PROJECT_LIST
self.stub_entity(httpretty.GET,
[self.collection_key,
'?scope.project.id=%s&user.id=%s' %
(self.TEST_TENANT_ID, self.TEST_USER_ID)],
entity=ref_list)
returned_list = self.manager.list(user=self.TEST_USER_ID,
project=self.TEST_TENANT_ID)
self._assert_returned_list(ref_list, returned_list)
kwargs = {'scope.project.id': self.TEST_TENANT_ID,
'user.id': self.TEST_USER_ID}
self.assertQueryStringContains(**kwargs)
@httpretty.activate
def test_all_assignments_list(self):
ref_list = self.TEST_ALL_RESPONSE_LIST
self.stub_entity(httpretty.GET,
[self.collection_key],
entity=ref_list)
returned_list = self.manager.list()
self._assert_returned_list(ref_list, returned_list)
kwargs = {}
self.assertQueryStringContains(**kwargs)
@httpretty.activate
def test_project_assignments_list(self):
ref_list = self.TEST_GROUP_PROJECT_LIST + self.TEST_USER_PROJECT_LIST
self.stub_entity(httpretty.GET,
[self.collection_key,
'?scope.project.id=%s' % self.TEST_TENANT_ID],
entity=ref_list)
returned_list = self.manager.list(project=self.TEST_TENANT_ID)
self._assert_returned_list(ref_list, returned_list)
kwargs = {'scope.project.id': self.TEST_TENANT_ID}
self.assertQueryStringContains(**kwargs)
@httpretty.activate
def test_domain_assignments_list(self):
ref_list = self.TEST_USER_DOMAIN_LIST
self.stub_entity(httpretty.GET,
[self.collection_key,
'?scope.domain.id=%s' % self.TEST_DOMAIN_ID],
entity=ref_list)
returned_list = self.manager.list(domain=self.TEST_DOMAIN_ID)
self._assert_returned_list(ref_list, returned_list)
kwargs = {'scope.domain.id': self.TEST_DOMAIN_ID}
self.assertQueryStringContains(**kwargs)
@httpretty.activate
def test_group_assignments_list(self):
ref_list = self.TEST_GROUP_PROJECT_LIST
self.stub_entity(httpretty.GET,
[self.collection_key,
'?group.id=%s' % self.TEST_GROUP_ID],
entity=ref_list)
returned_list = self.manager.list(group=self.TEST_GROUP_ID)
self._assert_returned_list(ref_list, returned_list)
kwargs = {'group.id': self.TEST_GROUP_ID}
self.assertQueryStringContains(**kwargs)
@httpretty.activate
def test_user_assignments_list(self):
ref_list = self.TEST_USER_DOMAIN_LIST + self.TEST_USER_PROJECT_LIST
self.stub_entity(httpretty.GET,
[self.collection_key,
'?user.id=%s' % self.TEST_USER_ID],
entity=ref_list)
returned_list = self.manager.list(user=self.TEST_USER_ID)
self._assert_returned_list(ref_list, returned_list)
kwargs = {'user.id': self.TEST_USER_ID}
self.assertQueryStringContains(**kwargs)
@httpretty.activate
def test_effective_assignments_list(self):
ref_list = self.TEST_USER_PROJECT_LIST + self.TEST_USER_DOMAIN_LIST
self.stub_entity(httpretty.GET,
[self.collection_key,
'?effective=True'],
entity=ref_list)
returned_list = self.manager.list(effective=True)
self._assert_returned_list(ref_list, returned_list)
kwargs = {'effective': 'True'}
self.assertQueryStringContains(**kwargs)
@httpretty.activate
def test_role_assignments_list(self):
ref_list = self.TEST_ALL_RESPONSE_LIST
self.stub_entity(httpretty.GET,
[self.collection_key,
'?role.id=' + self.TEST_ROLE_ID],
entity=ref_list)
returned_list = self.manager.list(role=self.TEST_ROLE_ID)
self._assert_returned_list(ref_list, returned_list)
kwargs = {'role.id': self.TEST_ROLE_ID}
self.assertQueryStringContains(**kwargs)
def test_domain_and_project_list(self):
# Should only accept either domain or project, never both
self.assertRaises(exceptions.ValidationError,
self.manager.list,
domain=self.TEST_DOMAIN_ID,
project=self.TEST_TENANT_ID)
def test_user_and_group_list(self):
# Should only accept either user or group, never both
self.assertRaises(exceptions.ValidationError, self.manager.list,
user=self.TEST_USER_ID, group=self.TEST_GROUP_ID)
def test_create(self):
# Create not supported for role assignments
self.assertRaises(exceptions.MethodNotImplemented, self.manager.create)
def test_update(self):
# Update not supported for role assignments
self.assertRaises(exceptions.MethodNotImplemented, self.manager.update)
def test_delete(self):
# Delete not supported for role assignments
self.assertRaises(exceptions.MethodNotImplemented, self.manager.delete)
def test_get(self):
# Get not supported for role assignments
self.assertRaises(exceptions.MethodNotImplemented, self.manager.get)
def test_find(self):
# Find not supported for role assignments
self.assertRaises(exceptions.MethodNotImplemented, self.manager.find)

View File

@@ -30,6 +30,7 @@ from keystoneclient.v3 import groups
from keystoneclient.v3 import policies
from keystoneclient.v3 import projects
from keystoneclient.v3 import regions
from keystoneclient.v3 import role_assignments
from keystoneclient.v3 import roles
from keystoneclient.v3 import services
from keystoneclient.v3 import users
@@ -96,6 +97,7 @@ class Client(httpclient.HTTPClient):
"""Initialize a new client for the Keystone v3 API."""
super(Client, self).__init__(**kwargs)
self.role_assignments = role_assignments.RoleAssignmentManager(self)
self.credentials = credentials.CredentialManager(self)
self.endpoints = endpoints.EndpointManager(self)
self.domains = domains.DomainManager(self)

View File

@@ -0,0 +1,111 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient import base
from keystoneclient import exceptions
class RoleAssignment(base.Resource):
"""Represents an Identity role assignment.
Attributes:
* role: an object which contains a role uuid
* user or group: an object which contains either a user or
group uuid
* scope: an object which has either a project or domain object
containing an uuid
"""
pass
class RoleAssignmentManager(base.CrudManager):
"""Manager class for manipulating Identity roles assignments."""
resource_class = RoleAssignment
collection_key = 'role_assignments'
key = 'role_assignment'
def _check_not_user_and_group(self, user, group):
if user and group:
msg = 'Specify either a user or group, not both'
raise exceptions.ValidationError(msg)
def _check_not_domain_and_project(self, domain, project):
if domain and project:
msg = 'Specify either a domain or project, not both'
raise exceptions.ValidationError(msg)
def list(self, user=None, group=None, project=None, domain=None, role=None,
effective=False):
"""Lists role assignments.
If no arguments are provided, all role assignments in the
system will be listed.
If both user and group are provided, a ValidationError will be
raised. If both domain and project are provided, it will also
raise a ValidationError.
:param user: User to be used as query filter. (optional)
:param group: Group to be used as query filter. (optional)
:param project: Project to be used as query filter.
(optional)
:param domain: Domain to be used as query
filter. (optional)
:param role: Role to be used as query filter. (optional)
:param boolean effective: return effective role
assignments. (optional)
"""
self._check_not_user_and_group(user, group)
self._check_not_domain_and_project(domain, project)
query_params = {}
if user:
query_params['user.id'] = base.getid(user)
if group:
query_params['group.id'] = base.getid(group)
if project:
query_params['scope.project.id'] = base.getid(project)
if domain:
query_params['scope.domain.id'] = base.getid(domain)
if role:
query_params['role.id'] = base.getid(role)
if effective:
query_params['effective'] = effective
return super(RoleAssignmentManager, self).list(**query_params)
def create(self, **kwargs):
raise exceptions.MethodNotImplemented('Create not supported for'
' role assignments')
def update(self, **kwargs):
raise exceptions.MethodNotImplemented('Update not supported for'
' role assignments')
def get(self, **kwargs):
raise exceptions.MethodNotImplemented('Get not supported for'
' role assignments')
def find(self, **kwargs):
raise exceptions.MethodNotImplemented('Find not supported for'
' role assignments')
def put(self, **kwargs):
raise exceptions.MethodNotImplemented('Put not supported for'
' role assignments')
def delete(self, **kwargs):
raise exceptions.MethodNotImplemented('Delete not supported for'
' role assignments')