v3 Domain/Project role grants

Change-Id: Idbe0702b42603d6f9f133c9f1855ea9b4f222066
This commit is contained in:
Dolph Mathews
2012-09-13 07:55:26 -05:00
committed by Gerrit Code Review
parent 0ee5147030
commit 46360085eb
4 changed files with 297 additions and 0 deletions

View File

@@ -9,6 +9,10 @@ class CommandError(Exception):
pass
class ValidationError(Exception):
pass
class AuthorizationFailure(Exception):
pass

View File

@@ -15,6 +15,7 @@
# under the License.
from keystoneclient import base
from keystoneclient import exceptions
class Role(base.Resource):
@@ -34,6 +35,23 @@ class RoleManager(base.CrudManager):
collection_key = 'roles'
key = 'role'
def _role_grants_base_url(self, user, domain, project):
params = {'user_id': base.getid(user)}
if domain:
params['domain_id'] = base.getid(domain)
base_url = '/domains/%(domain_id)s/users/%(user_id)s'
elif project:
params['project_id'] = base.getid(project)
base_url = '/projects/%(project_id)s/users/%(user_id)s'
return base_url % params
def _require_domain_or_project(self, domain, project):
if (domain and project) or (not domain and not project):
msg = 'Specify either a domain or project, not both'
raise exceptions.ValidationError(msg)
def create(self, name):
return super(RoleManager, self).create(
name=name)
@@ -42,6 +60,22 @@ class RoleManager(base.CrudManager):
return super(RoleManager, self).get(
role_id=base.getid(role))
def list(self, user=None, domain=None, project=None):
"""Lists roles and role grants.
If no arguments are provided, all roles in the system will be listed.
If a user is specified, you must also specify either a domain or
project to list role grants on that pair.
"""
if user:
self._require_domain_or_project(domain, project)
return super(RoleManager, self).list(
base_url=self._role_grants_base_url(user, domain, project))
return super(RoleManager, self).list()
def update(self, role, name=None):
return super(RoleManager, self).update(
role_id=base.getid(role),
@@ -50,3 +84,27 @@ class RoleManager(base.CrudManager):
def delete(self, role):
return super(RoleManager, self).delete(
role_id=base.getid(role))
def grant(self, role, user, domain=None, project=None):
"""Grants a role to a user on either a domain or project."""
self._require_domain_or_project(domain, project)
return super(RoleManager, self).put(
base_url=self._role_grants_base_url(user, domain, project),
role_id=base.getid(role))
def check(self, role, user, domain=None, project=None):
"""Grants a role to a user on either a domain or project."""
self._require_domain_or_project(domain, project)
return super(RoleManager, self).head(
base_url=self._role_grants_base_url(user, domain, project),
role_id=base.getid(role))
def revoke(self, role, user, domain=None, project=None):
"""Revokes a role from a user on either a domain or project."""
self._require_domain_or_project(domain, project)
return super(RoleManager, self).delete(
base_url=self._role_grants_base_url(user, domain, project),
role_id=base.getid(role))

View File

@@ -1,5 +1,8 @@
import httplib2
import urlparse
import uuid
from keystoneclient import exceptions
from keystoneclient.v3 import roles
from tests.v3 import utils
@@ -17,3 +20,233 @@ class RoleTests(utils.TestCase, utils.CrudTests):
kwargs = super(RoleTests, self).new_ref(**kwargs)
kwargs.setdefault('name', uuid.uuid4().hex)
return kwargs
def test_domain_role_grant(self):
user_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex
ref = self.new_ref()
resp = httplib2.Response({
'status': 201,
'body': '',
})
method = 'PUT'
httplib2.Http.request(
urlparse.urljoin(
self.TEST_URL,
'v3/domains/%s/users/%s/%s/%s' % (
domain_id, user_id, self.collection_key, ref['id'])),
method,
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
self.manager.grant(role=ref['id'], domain=domain_id, user=user_id)
def test_domain_role_list(self):
user_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex
ref_list = [self.new_ref(), self.new_ref()]
resp = httplib2.Response({
'status': 200,
'body': self.serialize(ref_list),
})
method = 'GET'
httplib2.Http.request(
urlparse.urljoin(
self.TEST_URL,
'v3/domains/%s/users/%s/%s' % (
domain_id, user_id, self.collection_key)),
method,
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
self.manager.list(domain=domain_id, user=user_id)
def test_domain_role_check(self):
user_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex
ref = self.new_ref()
resp = httplib2.Response({
'status': 200,
'body': '',
})
method = 'HEAD'
httplib2.Http.request(
urlparse.urljoin(
self.TEST_URL,
'v3/domains/%s/users/%s/%s/%s' % (
domain_id, user_id, self.collection_key, ref['id'])),
method,
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
self.manager.check(role=ref['id'], domain=domain_id, user=user_id)
def test_domain_role_revoke(self):
user_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex
ref = self.new_ref()
resp = httplib2.Response({
'status': 204,
'body': '',
})
method = 'DELETE'
httplib2.Http.request(
urlparse.urljoin(
self.TEST_URL,
'v3/domains/%s/users/%s/%s/%s' % (
domain_id, user_id, self.collection_key, ref['id'])),
method,
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
self.manager.revoke(role=ref['id'], domain=domain_id, user=user_id)
def test_project_role_grant(self):
user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex
ref = self.new_ref()
resp = httplib2.Response({
'status': 201,
'body': '',
})
method = 'PUT'
httplib2.Http.request(
urlparse.urljoin(
self.TEST_URL,
'v3/projects/%s/users/%s/%s/%s' % (
project_id, user_id, self.collection_key, ref['id'])),
method,
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
self.manager.grant(role=ref['id'], project=project_id, user=user_id)
def test_project_role_list(self):
user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex
ref_list = [self.new_ref(), self.new_ref()]
resp = httplib2.Response({
'status': 200,
'body': self.serialize(ref_list),
})
method = 'GET'
httplib2.Http.request(
urlparse.urljoin(
self.TEST_URL,
'v3/projects/%s/users/%s/%s' % (
project_id, user_id, self.collection_key)),
method,
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
self.manager.list(project=project_id, user=user_id)
def test_project_role_check(self):
user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex
ref = self.new_ref()
resp = httplib2.Response({
'status': 200,
'body': '',
})
method = 'HEAD'
httplib2.Http.request(
urlparse.urljoin(
self.TEST_URL,
'v3/projects/%s/users/%s/%s/%s' % (
project_id, user_id, self.collection_key, ref['id'])),
method,
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
self.manager.check(role=ref['id'], project=project_id, user=user_id)
def test_project_role_revoke(self):
user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex
ref = self.new_ref()
resp = httplib2.Response({
'status': 204,
'body': '',
})
method = 'DELETE'
httplib2.Http.request(
urlparse.urljoin(
self.TEST_URL,
'v3/projects/%s/users/%s/%s/%s' % (
project_id, user_id, self.collection_key, ref['id'])),
method,
headers=self.headers[method]) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
self.manager.revoke(role=ref['id'], project=project_id, user=user_id)
def test_domain_project_role_grant_fails(self):
user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex
ref = self.new_ref()
self.assertRaises(
exceptions.ValidationError,
self.manager.grant,
role=ref['id'],
domain=domain_id,
project=project_id,
user=user_id)
def test_domain_project_role_list_fails(self):
user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex
self.assertRaises(
exceptions.ValidationError,
self.manager.list,
domain=domain_id,
project=project_id,
user=user_id)
def test_domain_project_role_check_fails(self):
user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex
ref = self.new_ref()
self.assertRaises(
exceptions.ValidationError,
self.manager.check,
role=ref['id'],
domain=domain_id,
project=project_id,
user=user_id)
def test_domain_project_role_revoke_fails(self):
user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex
domain_id = uuid.uuid4().hex
ref = self.new_ref()
self.assertRaises(
exceptions.ValidationError,
self.manager.revoke,
role=ref['id'],
domain=domain_id,
project=project_id,
user=user_id)

View File

@@ -91,7 +91,9 @@ class CrudTests(object):
}
}
self.headers['HEAD'] = self.headers['GET'].copy()
self.headers['DELETE'] = self.headers['GET'].copy()
self.headers['PUT'] = self.headers['GET'].copy()
self.headers['POST'] = self.headers['GET'].copy()
self.headers['POST']['Content-Type'] = 'application/json'
self.headers['PATCH'] = self.headers['POST'].copy()