diff --git a/keystoneclient/exceptions.py b/keystoneclient/exceptions.py index 31f25cb70..f441daf4e 100644 --- a/keystoneclient/exceptions.py +++ b/keystoneclient/exceptions.py @@ -62,6 +62,10 @@ class VersionNotAvailable(DiscoveryFailure): """Discovery failed as the version you requested is not available.""" +class MethodNotImplemented(ClientException): + """Method not implemented by the keystoneclient API.""" + + class MissingAuthPlugin(ClientException): """An authenticated request is required but no plugin available.""" diff --git a/keystoneclient/tests/utils.py b/keystoneclient/tests/utils.py index cef8608fd..9285b3854 100644 --- a/keystoneclient/tests/utils.py +++ b/keystoneclient/tests/utils.py @@ -22,6 +22,7 @@ import requests import six from six.moves.urllib import parse as urlparse import testtools +import uuid from keystoneclient.openstack.common import jsonutils @@ -29,11 +30,14 @@ from keystoneclient.openstack.common import jsonutils class TestCase(testtools.TestCase): TEST_DOMAIN_ID = '1' TEST_DOMAIN_NAME = 'aDomain' + TEST_GROUP_ID = uuid.uuid4().hex + TEST_ROLE_ID = uuid.uuid4().hex TEST_TENANT_ID = '1' TEST_TENANT_NAME = 'aTenant' TEST_TOKEN = 'aToken' TEST_TRUST_ID = 'aTrust' TEST_USER = 'test' + TEST_USER_ID = uuid.uuid4().hex TEST_ROOT_URL = 'http://127.0.0.1:5000/' @@ -63,6 +67,8 @@ class TestCase(testtools.TestCase): else: url = base_url + # For urls containing queries + url = url.replace("/?", "?") httpretty.register_uri(method, url, **kwargs) def assertRequestBodyIs(self, body=None, json=None): diff --git a/keystoneclient/tests/v3/test_role_assignments.py b/keystoneclient/tests/v3/test_role_assignments.py new file mode 100644 index 000000000..a28024e4a --- /dev/null +++ b/keystoneclient/tests/v3/test_role_assignments.py @@ -0,0 +1,220 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import httpretty + +from keystoneclient import exceptions +from keystoneclient.tests.v3 import utils +from keystoneclient.v3 import role_assignments + + +class RoleAssignmentsTests(utils.TestCase, utils.CrudTests): + + def setUp(self): + super(RoleAssignmentsTests, self).setUp() + self.key = 'role_assignment' + self.collection_key = 'role_assignments' + self.model = role_assignments.RoleAssignment + self.manager = self.client.role_assignments + self.TEST_USER_DOMAIN_LIST = [{ + 'role': { + 'id': self.TEST_ROLE_ID + }, + 'scope': { + 'domain': { + 'id': self.TEST_DOMAIN_ID + } + }, + 'user': { + 'id': self.TEST_USER_ID + } + }] + self.TEST_GROUP_PROJECT_LIST = [{ + 'group': { + 'id': self.TEST_GROUP_ID + }, + 'role': { + 'id': self.TEST_ROLE_ID + }, + 'scope': { + 'project': { + 'id': self.TEST_TENANT_ID + } + } + }] + self.TEST_USER_PROJECT_LIST = [{ + 'user': { + 'id': self.TEST_USER_ID + }, + 'role': { + 'id': self.TEST_ROLE_ID + }, + 'scope': { + 'project': { + 'id': self.TEST_TENANT_ID + } + } + }] + + self.TEST_ALL_RESPONSE_LIST = (self.TEST_USER_PROJECT_LIST + + self.TEST_GROUP_PROJECT_LIST + + self.TEST_USER_DOMAIN_LIST) + + def _assert_returned_list(self, ref_list, returned_list): + self.assertEqual(len(ref_list), len(returned_list)) + [self.assertIsInstance(r, self.model) for r in returned_list] + + @httpretty.activate + def test_list_params(self): + ref_list = self.TEST_USER_PROJECT_LIST + self.stub_entity(httpretty.GET, + [self.collection_key, + '?scope.project.id=%s&user.id=%s' % + (self.TEST_TENANT_ID, self.TEST_USER_ID)], + entity=ref_list) + + returned_list = self.manager.list(user=self.TEST_USER_ID, + project=self.TEST_TENANT_ID) + self._assert_returned_list(ref_list, returned_list) + + kwargs = {'scope.project.id': self.TEST_TENANT_ID, + 'user.id': self.TEST_USER_ID} + self.assertQueryStringContains(**kwargs) + + @httpretty.activate + def test_all_assignments_list(self): + ref_list = self.TEST_ALL_RESPONSE_LIST + self.stub_entity(httpretty.GET, + [self.collection_key], + entity=ref_list) + + returned_list = self.manager.list() + self._assert_returned_list(ref_list, returned_list) + + kwargs = {} + self.assertQueryStringContains(**kwargs) + + @httpretty.activate + def test_project_assignments_list(self): + ref_list = self.TEST_GROUP_PROJECT_LIST + self.TEST_USER_PROJECT_LIST + self.stub_entity(httpretty.GET, + [self.collection_key, + '?scope.project.id=%s' % self.TEST_TENANT_ID], + entity=ref_list) + + returned_list = self.manager.list(project=self.TEST_TENANT_ID) + self._assert_returned_list(ref_list, returned_list) + + kwargs = {'scope.project.id': self.TEST_TENANT_ID} + self.assertQueryStringContains(**kwargs) + + @httpretty.activate + def test_domain_assignments_list(self): + ref_list = self.TEST_USER_DOMAIN_LIST + self.stub_entity(httpretty.GET, + [self.collection_key, + '?scope.domain.id=%s' % self.TEST_DOMAIN_ID], + entity=ref_list) + + returned_list = self.manager.list(domain=self.TEST_DOMAIN_ID) + self._assert_returned_list(ref_list, returned_list) + + kwargs = {'scope.domain.id': self.TEST_DOMAIN_ID} + self.assertQueryStringContains(**kwargs) + + @httpretty.activate + def test_group_assignments_list(self): + ref_list = self.TEST_GROUP_PROJECT_LIST + self.stub_entity(httpretty.GET, + [self.collection_key, + '?group.id=%s' % self.TEST_GROUP_ID], + entity=ref_list) + + returned_list = self.manager.list(group=self.TEST_GROUP_ID) + self._assert_returned_list(ref_list, returned_list) + + kwargs = {'group.id': self.TEST_GROUP_ID} + self.assertQueryStringContains(**kwargs) + + @httpretty.activate + def test_user_assignments_list(self): + ref_list = self.TEST_USER_DOMAIN_LIST + self.TEST_USER_PROJECT_LIST + self.stub_entity(httpretty.GET, + [self.collection_key, + '?user.id=%s' % self.TEST_USER_ID], + entity=ref_list) + + returned_list = self.manager.list(user=self.TEST_USER_ID) + self._assert_returned_list(ref_list, returned_list) + + kwargs = {'user.id': self.TEST_USER_ID} + self.assertQueryStringContains(**kwargs) + + @httpretty.activate + def test_effective_assignments_list(self): + ref_list = self.TEST_USER_PROJECT_LIST + self.TEST_USER_DOMAIN_LIST + self.stub_entity(httpretty.GET, + [self.collection_key, + '?effective=True'], + entity=ref_list) + + returned_list = self.manager.list(effective=True) + self._assert_returned_list(ref_list, returned_list) + + kwargs = {'effective': 'True'} + self.assertQueryStringContains(**kwargs) + + @httpretty.activate + def test_role_assignments_list(self): + ref_list = self.TEST_ALL_RESPONSE_LIST + self.stub_entity(httpretty.GET, + [self.collection_key, + '?role.id=' + self.TEST_ROLE_ID], + entity=ref_list) + + returned_list = self.manager.list(role=self.TEST_ROLE_ID) + self._assert_returned_list(ref_list, returned_list) + + kwargs = {'role.id': self.TEST_ROLE_ID} + self.assertQueryStringContains(**kwargs) + + def test_domain_and_project_list(self): + # Should only accept either domain or project, never both + self.assertRaises(exceptions.ValidationError, + self.manager.list, + domain=self.TEST_DOMAIN_ID, + project=self.TEST_TENANT_ID) + + def test_user_and_group_list(self): + # Should only accept either user or group, never both + self.assertRaises(exceptions.ValidationError, self.manager.list, + user=self.TEST_USER_ID, group=self.TEST_GROUP_ID) + + def test_create(self): + # Create not supported for role assignments + self.assertRaises(exceptions.MethodNotImplemented, self.manager.create) + + def test_update(self): + # Update not supported for role assignments + self.assertRaises(exceptions.MethodNotImplemented, self.manager.update) + + def test_delete(self): + # Delete not supported for role assignments + self.assertRaises(exceptions.MethodNotImplemented, self.manager.delete) + + def test_get(self): + # Get not supported for role assignments + self.assertRaises(exceptions.MethodNotImplemented, self.manager.get) + + def test_find(self): + # Find not supported for role assignments + self.assertRaises(exceptions.MethodNotImplemented, self.manager.find) diff --git a/keystoneclient/v3/client.py b/keystoneclient/v3/client.py index a9053ba2c..4be860e97 100644 --- a/keystoneclient/v3/client.py +++ b/keystoneclient/v3/client.py @@ -30,6 +30,7 @@ from keystoneclient.v3 import groups from keystoneclient.v3 import policies from keystoneclient.v3 import projects from keystoneclient.v3 import regions +from keystoneclient.v3 import role_assignments from keystoneclient.v3 import roles from keystoneclient.v3 import services from keystoneclient.v3 import users @@ -96,6 +97,7 @@ class Client(httpclient.HTTPClient): """Initialize a new client for the Keystone v3 API.""" super(Client, self).__init__(**kwargs) + self.role_assignments = role_assignments.RoleAssignmentManager(self) self.credentials = credentials.CredentialManager(self) self.endpoints = endpoints.EndpointManager(self) self.domains = domains.DomainManager(self) diff --git a/keystoneclient/v3/role_assignments.py b/keystoneclient/v3/role_assignments.py new file mode 100644 index 000000000..5394c3d21 --- /dev/null +++ b/keystoneclient/v3/role_assignments.py @@ -0,0 +1,111 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from keystoneclient import base +from keystoneclient import exceptions + + +class RoleAssignment(base.Resource): + + """Represents an Identity role assignment. + + Attributes: + * role: an object which contains a role uuid + * user or group: an object which contains either a user or + group uuid + * scope: an object which has either a project or domain object + containing an uuid + """ + pass + + +class RoleAssignmentManager(base.CrudManager): + + """Manager class for manipulating Identity roles assignments.""" + resource_class = RoleAssignment + collection_key = 'role_assignments' + key = 'role_assignment' + + def _check_not_user_and_group(self, user, group): + if user and group: + msg = 'Specify either a user or group, not both' + raise exceptions.ValidationError(msg) + + def _check_not_domain_and_project(self, domain, project): + if domain and project: + msg = 'Specify either a domain or project, not both' + raise exceptions.ValidationError(msg) + + def list(self, user=None, group=None, project=None, domain=None, role=None, + effective=False): + """Lists role assignments. + + If no arguments are provided, all role assignments in the + system will be listed. + + If both user and group are provided, a ValidationError will be + raised. If both domain and project are provided, it will also + raise a ValidationError. + + :param user: User to be used as query filter. (optional) + :param group: Group to be used as query filter. (optional) + :param project: Project to be used as query filter. + (optional) + :param domain: Domain to be used as query + filter. (optional) + :param role: Role to be used as query filter. (optional) + :param boolean effective: return effective role + assignments. (optional) + """ + + self._check_not_user_and_group(user, group) + self._check_not_domain_and_project(domain, project) + + query_params = {} + if user: + query_params['user.id'] = base.getid(user) + if group: + query_params['group.id'] = base.getid(group) + if project: + query_params['scope.project.id'] = base.getid(project) + if domain: + query_params['scope.domain.id'] = base.getid(domain) + if role: + query_params['role.id'] = base.getid(role) + if effective: + query_params['effective'] = effective + + return super(RoleAssignmentManager, self).list(**query_params) + + def create(self, **kwargs): + raise exceptions.MethodNotImplemented('Create not supported for' + ' role assignments') + + def update(self, **kwargs): + raise exceptions.MethodNotImplemented('Update not supported for' + ' role assignments') + + def get(self, **kwargs): + raise exceptions.MethodNotImplemented('Get not supported for' + ' role assignments') + + def find(self, **kwargs): + raise exceptions.MethodNotImplemented('Find not supported for' + ' role assignments') + + def put(self, **kwargs): + raise exceptions.MethodNotImplemented('Put not supported for' + ' role assignments') + + def delete(self, **kwargs): + raise exceptions.MethodNotImplemented('Delete not supported for' + ' role assignments')