diff --git a/keystoneclient/exceptions.py b/keystoneclient/exceptions.py index d01f3d290..06e6c57dd 100644 --- a/keystoneclient/exceptions.py +++ b/keystoneclient/exceptions.py @@ -9,6 +9,10 @@ class CommandError(Exception): pass +class ValidationError(Exception): + pass + + class AuthorizationFailure(Exception): pass diff --git a/keystoneclient/v3/roles.py b/keystoneclient/v3/roles.py index 0e0c180fb..690602575 100644 --- a/keystoneclient/v3/roles.py +++ b/keystoneclient/v3/roles.py @@ -15,6 +15,7 @@ # under the License. from keystoneclient import base +from keystoneclient import exceptions class Role(base.Resource): @@ -34,6 +35,23 @@ class RoleManager(base.CrudManager): collection_key = 'roles' key = 'role' + def _role_grants_base_url(self, user, domain, project): + params = {'user_id': base.getid(user)} + + if domain: + params['domain_id'] = base.getid(domain) + base_url = '/domains/%(domain_id)s/users/%(user_id)s' + elif project: + params['project_id'] = base.getid(project) + base_url = '/projects/%(project_id)s/users/%(user_id)s' + + return base_url % params + + def _require_domain_or_project(self, domain, project): + if (domain and project) or (not domain and not project): + msg = 'Specify either a domain or project, not both' + raise exceptions.ValidationError(msg) + def create(self, name): return super(RoleManager, self).create( name=name) @@ -42,6 +60,22 @@ class RoleManager(base.CrudManager): return super(RoleManager, self).get( role_id=base.getid(role)) + def list(self, user=None, domain=None, project=None): + """Lists roles and role grants. + + If no arguments are provided, all roles in the system will be listed. + + If a user is specified, you must also specify either a domain or + project to list role grants on that pair. + """ + + if user: + self._require_domain_or_project(domain, project) + return super(RoleManager, self).list( + base_url=self._role_grants_base_url(user, domain, project)) + + return super(RoleManager, self).list() + def update(self, role, name=None): return super(RoleManager, self).update( role_id=base.getid(role), @@ -50,3 +84,27 @@ class RoleManager(base.CrudManager): def delete(self, role): return super(RoleManager, self).delete( role_id=base.getid(role)) + + def grant(self, role, user, domain=None, project=None): + """Grants a role to a user on either a domain or project.""" + self._require_domain_or_project(domain, project) + + return super(RoleManager, self).put( + base_url=self._role_grants_base_url(user, domain, project), + role_id=base.getid(role)) + + def check(self, role, user, domain=None, project=None): + """Grants a role to a user on either a domain or project.""" + self._require_domain_or_project(domain, project) + + return super(RoleManager, self).head( + base_url=self._role_grants_base_url(user, domain, project), + role_id=base.getid(role)) + + def revoke(self, role, user, domain=None, project=None): + """Revokes a role from a user on either a domain or project.""" + self._require_domain_or_project(domain, project) + + return super(RoleManager, self).delete( + base_url=self._role_grants_base_url(user, domain, project), + role_id=base.getid(role)) diff --git a/tests/v3/test_roles.py b/tests/v3/test_roles.py index ef1f7ce39..e3fe35314 100644 --- a/tests/v3/test_roles.py +++ b/tests/v3/test_roles.py @@ -1,5 +1,8 @@ +import httplib2 +import urlparse import uuid +from keystoneclient import exceptions from keystoneclient.v3 import roles from tests.v3 import utils @@ -17,3 +20,233 @@ class RoleTests(utils.TestCase, utils.CrudTests): kwargs = super(RoleTests, self).new_ref(**kwargs) kwargs.setdefault('name', uuid.uuid4().hex) return kwargs + + def test_domain_role_grant(self): + user_id = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + ref = self.new_ref() + resp = httplib2.Response({ + 'status': 201, + 'body': '', + }) + + method = 'PUT' + httplib2.Http.request( + urlparse.urljoin( + self.TEST_URL, + 'v3/domains/%s/users/%s/%s/%s' % ( + domain_id, user_id, self.collection_key, ref['id'])), + method, + headers=self.headers[method]) \ + .AndReturn((resp, resp['body'])) + self.mox.ReplayAll() + + self.manager.grant(role=ref['id'], domain=domain_id, user=user_id) + + def test_domain_role_list(self): + user_id = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + ref_list = [self.new_ref(), self.new_ref()] + resp = httplib2.Response({ + 'status': 200, + 'body': self.serialize(ref_list), + }) + + method = 'GET' + httplib2.Http.request( + urlparse.urljoin( + self.TEST_URL, + 'v3/domains/%s/users/%s/%s' % ( + domain_id, user_id, self.collection_key)), + method, + headers=self.headers[method]) \ + .AndReturn((resp, resp['body'])) + self.mox.ReplayAll() + + self.manager.list(domain=domain_id, user=user_id) + + def test_domain_role_check(self): + user_id = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + ref = self.new_ref() + resp = httplib2.Response({ + 'status': 200, + 'body': '', + }) + + method = 'HEAD' + httplib2.Http.request( + urlparse.urljoin( + self.TEST_URL, + 'v3/domains/%s/users/%s/%s/%s' % ( + domain_id, user_id, self.collection_key, ref['id'])), + method, + headers=self.headers[method]) \ + .AndReturn((resp, resp['body'])) + self.mox.ReplayAll() + + self.manager.check(role=ref['id'], domain=domain_id, user=user_id) + + def test_domain_role_revoke(self): + user_id = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + ref = self.new_ref() + resp = httplib2.Response({ + 'status': 204, + 'body': '', + }) + + method = 'DELETE' + httplib2.Http.request( + urlparse.urljoin( + self.TEST_URL, + 'v3/domains/%s/users/%s/%s/%s' % ( + domain_id, user_id, self.collection_key, ref['id'])), + method, + headers=self.headers[method]) \ + .AndReturn((resp, resp['body'])) + self.mox.ReplayAll() + + self.manager.revoke(role=ref['id'], domain=domain_id, user=user_id) + + def test_project_role_grant(self): + user_id = uuid.uuid4().hex + project_id = uuid.uuid4().hex + ref = self.new_ref() + resp = httplib2.Response({ + 'status': 201, + 'body': '', + }) + + method = 'PUT' + httplib2.Http.request( + urlparse.urljoin( + self.TEST_URL, + 'v3/projects/%s/users/%s/%s/%s' % ( + project_id, user_id, self.collection_key, ref['id'])), + method, + headers=self.headers[method]) \ + .AndReturn((resp, resp['body'])) + self.mox.ReplayAll() + + self.manager.grant(role=ref['id'], project=project_id, user=user_id) + + def test_project_role_list(self): + user_id = uuid.uuid4().hex + project_id = uuid.uuid4().hex + ref_list = [self.new_ref(), self.new_ref()] + resp = httplib2.Response({ + 'status': 200, + 'body': self.serialize(ref_list), + }) + + method = 'GET' + httplib2.Http.request( + urlparse.urljoin( + self.TEST_URL, + 'v3/projects/%s/users/%s/%s' % ( + project_id, user_id, self.collection_key)), + method, + headers=self.headers[method]) \ + .AndReturn((resp, resp['body'])) + self.mox.ReplayAll() + + self.manager.list(project=project_id, user=user_id) + + def test_project_role_check(self): + user_id = uuid.uuid4().hex + project_id = uuid.uuid4().hex + ref = self.new_ref() + resp = httplib2.Response({ + 'status': 200, + 'body': '', + }) + + method = 'HEAD' + httplib2.Http.request( + urlparse.urljoin( + self.TEST_URL, + 'v3/projects/%s/users/%s/%s/%s' % ( + project_id, user_id, self.collection_key, ref['id'])), + method, + headers=self.headers[method]) \ + .AndReturn((resp, resp['body'])) + self.mox.ReplayAll() + + self.manager.check(role=ref['id'], project=project_id, user=user_id) + + def test_project_role_revoke(self): + user_id = uuid.uuid4().hex + project_id = uuid.uuid4().hex + ref = self.new_ref() + resp = httplib2.Response({ + 'status': 204, + 'body': '', + }) + + method = 'DELETE' + httplib2.Http.request( + urlparse.urljoin( + self.TEST_URL, + 'v3/projects/%s/users/%s/%s/%s' % ( + project_id, user_id, self.collection_key, ref['id'])), + method, + headers=self.headers[method]) \ + .AndReturn((resp, resp['body'])) + self.mox.ReplayAll() + + self.manager.revoke(role=ref['id'], project=project_id, user=user_id) + + def test_domain_project_role_grant_fails(self): + user_id = uuid.uuid4().hex + project_id = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + ref = self.new_ref() + + self.assertRaises( + exceptions.ValidationError, + self.manager.grant, + role=ref['id'], + domain=domain_id, + project=project_id, + user=user_id) + + def test_domain_project_role_list_fails(self): + user_id = uuid.uuid4().hex + project_id = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + + self.assertRaises( + exceptions.ValidationError, + self.manager.list, + domain=domain_id, + project=project_id, + user=user_id) + + def test_domain_project_role_check_fails(self): + user_id = uuid.uuid4().hex + project_id = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + ref = self.new_ref() + + self.assertRaises( + exceptions.ValidationError, + self.manager.check, + role=ref['id'], + domain=domain_id, + project=project_id, + user=user_id) + + def test_domain_project_role_revoke_fails(self): + user_id = uuid.uuid4().hex + project_id = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + ref = self.new_ref() + + self.assertRaises( + exceptions.ValidationError, + self.manager.revoke, + role=ref['id'], + domain=domain_id, + project=project_id, + user=user_id) diff --git a/tests/v3/utils.py b/tests/v3/utils.py index af8d68bbe..d45a07cba 100644 --- a/tests/v3/utils.py +++ b/tests/v3/utils.py @@ -91,7 +91,9 @@ class CrudTests(object): } } + self.headers['HEAD'] = self.headers['GET'].copy() self.headers['DELETE'] = self.headers['GET'].copy() + self.headers['PUT'] = self.headers['GET'].copy() self.headers['POST'] = self.headers['GET'].copy() self.headers['POST']['Content-Type'] = 'application/json' self.headers['PATCH'] = self.headers['POST'].copy()