Add support for user groups
This initial change provides the support in keystoneclient to match the api specified in the blueprint. Keystone itself does not yet support these calls, so if you actually called these, then you would get an error from the server. However the changes should be benign in terms of other api calls. Blueprint keystone/+spec/user-groups Api changes: review.openstack.org/#/c/18138 DocImpact Change-Id: I9abfa82b39fa0c6d58fe0d22622944d3e6be39be
This commit is contained in:
@@ -19,6 +19,7 @@ from keystoneclient.v2_0 import client
|
||||
from keystoneclient.v3 import credentials
|
||||
from keystoneclient.v3 import endpoints
|
||||
from keystoneclient.v3 import domains
|
||||
from keystoneclient.v3 import groups
|
||||
from keystoneclient.v3 import policies
|
||||
from keystoneclient.v3 import projects
|
||||
from keystoneclient.v3 import roles
|
||||
@@ -68,6 +69,7 @@ class Client(client.Client):
|
||||
self.credentials = credentials.CredentialManager(self)
|
||||
self.endpoints = endpoints.EndpointManager(self)
|
||||
self.domains = domains.DomainManager(self)
|
||||
self.groups = groups.GroupManager(self)
|
||||
self.policies = policies.PolicyManager(self)
|
||||
self.projects = projects.ProjectManager(self)
|
||||
self.roles = roles.RoleManager(self)
|
||||
|
78
keystoneclient/v3/groups.py
Normal file
78
keystoneclient/v3/groups.py
Normal file
@@ -0,0 +1,78 @@
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# Copyright 2011 Nebula, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from keystoneclient import base
|
||||
|
||||
|
||||
class Group(base.Resource):
|
||||
"""Represents an Identity user group.
|
||||
|
||||
Attributes:
|
||||
* id: a uuid that identifies the group
|
||||
* name: group name
|
||||
* description: group description
|
||||
|
||||
"""
|
||||
def update(self, name=None, description=None):
|
||||
kwargs = {
|
||||
'name': name if name is not None else self.name,
|
||||
'description': (description
|
||||
if description is not None
|
||||
else self.description),
|
||||
}
|
||||
|
||||
try:
|
||||
retval = self.manager.update(self.id, **kwargs)
|
||||
self = retval
|
||||
except Exception:
|
||||
retval = None
|
||||
|
||||
return retval
|
||||
|
||||
|
||||
class GroupManager(base.CrudManager):
|
||||
"""Manager class for manipulating Identity groups."""
|
||||
resource_class = Group
|
||||
collection_key = 'groups'
|
||||
key = 'group'
|
||||
|
||||
def create(self, name, domain=None, description=None):
|
||||
return super(GroupManager, self).create(
|
||||
name=name,
|
||||
domain_id=base.getid(domain),
|
||||
description=description)
|
||||
|
||||
def list(self, user=None):
|
||||
if user:
|
||||
base_url = '/users/%s' % base.getid(user)
|
||||
else:
|
||||
base_url = None
|
||||
return super(GroupManager, self).list(
|
||||
base_url=base_url)
|
||||
|
||||
def get(self, group):
|
||||
return super(GroupManager, self).get(
|
||||
group_id=base.getid(group))
|
||||
|
||||
def update(self, group, name=None, description=None):
|
||||
return super(GroupManager, self).update(
|
||||
group_id=base.getid(group),
|
||||
name=name,
|
||||
description=description)
|
||||
|
||||
def delete(self, group):
|
||||
return super(GroupManager, self).delete(
|
||||
group_id=base.getid(group))
|
@@ -35,23 +35,37 @@ class RoleManager(base.CrudManager):
|
||||
collection_key = 'roles'
|
||||
key = 'role'
|
||||
|
||||
def _role_grants_base_url(self, user, domain, project):
|
||||
params = {'user_id': base.getid(user)}
|
||||
def _role_grants_base_url(self, user, group, domain, project):
|
||||
# When called, we have already checked that only one of user & group
|
||||
# and one of domain & project have been specified
|
||||
params = {}
|
||||
|
||||
if domain:
|
||||
params['domain_id'] = base.getid(domain)
|
||||
base_url = '/domains/%(domain_id)s/users/%(user_id)s'
|
||||
elif project:
|
||||
if project:
|
||||
params['project_id'] = base.getid(project)
|
||||
base_url = '/projects/%(project_id)s/users/%(user_id)s'
|
||||
base_url = '/projects/%(project_id)s'
|
||||
elif domain:
|
||||
params['domain_id'] = base.getid(domain)
|
||||
base_url = '/domains/%(domain_id)s'
|
||||
|
||||
if user:
|
||||
params['user_id'] = base.getid(user)
|
||||
base_url += '/users/%(user_id)s'
|
||||
elif group:
|
||||
params['group_id'] = base.getid(group)
|
||||
base_url += '/groups/%(group_id)s'
|
||||
|
||||
return base_url % params
|
||||
|
||||
def _require_domain_or_project(self, domain, project):
|
||||
def _require_domain_xor_project(self, domain, project):
|
||||
if (domain and project) or (not domain and not project):
|
||||
msg = 'Specify either a domain or project, not both'
|
||||
raise exceptions.ValidationError(msg)
|
||||
|
||||
def _require_user_xor_group(self, user, group):
|
||||
if (user and group) or (not user and not group):
|
||||
msg = 'Specify either a user or group, not both'
|
||||
raise exceptions.ValidationError(msg)
|
||||
|
||||
def create(self, name):
|
||||
return super(RoleManager, self).create(
|
||||
name=name)
|
||||
@@ -60,19 +74,22 @@ class RoleManager(base.CrudManager):
|
||||
return super(RoleManager, self).get(
|
||||
role_id=base.getid(role))
|
||||
|
||||
def list(self, user=None, domain=None, project=None):
|
||||
def list(self, user=None, group=None, domain=None, project=None):
|
||||
"""Lists roles and role grants.
|
||||
|
||||
If no arguments are provided, all roles in the system will be listed.
|
||||
|
||||
If a user is specified, you must also specify either a domain or
|
||||
project to list role grants on that pair.
|
||||
If a user or group is specified, you must also specify either a
|
||||
domain or project to list role grants on that pair.
|
||||
"""
|
||||
|
||||
if user:
|
||||
self._require_domain_or_project(domain, project)
|
||||
if user or group:
|
||||
self._require_user_xor_group(user, group)
|
||||
self._require_domain_xor_project(domain, project)
|
||||
|
||||
return super(RoleManager, self).list(
|
||||
base_url=self._role_grants_base_url(user, domain, project))
|
||||
base_url=self._role_grants_base_url(user, group,
|
||||
domain, project))
|
||||
|
||||
return super(RoleManager, self).list()
|
||||
|
||||
@@ -85,26 +102,29 @@ class RoleManager(base.CrudManager):
|
||||
return super(RoleManager, self).delete(
|
||||
role_id=base.getid(role))
|
||||
|
||||
def grant(self, role, user, domain=None, project=None):
|
||||
"""Grants a role to a user on either a domain or project."""
|
||||
self._require_domain_or_project(domain, project)
|
||||
def grant(self, role, user=None, group=None, domain=None, project=None):
|
||||
"""Grants a role to a user or group on a domain or project."""
|
||||
self._require_domain_xor_project(domain, project)
|
||||
self._require_user_xor_group(user, group)
|
||||
|
||||
return super(RoleManager, self).put(
|
||||
base_url=self._role_grants_base_url(user, domain, project),
|
||||
base_url=self._role_grants_base_url(user, group, domain, project),
|
||||
role_id=base.getid(role))
|
||||
|
||||
def check(self, role, user, domain=None, project=None):
|
||||
"""Grants a role to a user on either a domain or project."""
|
||||
self._require_domain_or_project(domain, project)
|
||||
def check(self, role, user=None, group=None, domain=None, project=None):
|
||||
"""Checks if a user or group has a role on a domain or project."""
|
||||
self._require_domain_xor_project(domain, project)
|
||||
self._require_user_xor_group(user, group)
|
||||
|
||||
return super(RoleManager, self).head(
|
||||
base_url=self._role_grants_base_url(user, domain, project),
|
||||
base_url=self._role_grants_base_url(user, group, domain, project),
|
||||
role_id=base.getid(role))
|
||||
|
||||
def revoke(self, role, user, domain=None, project=None):
|
||||
"""Revokes a role from a user on either a domain or project."""
|
||||
self._require_domain_or_project(domain, project)
|
||||
def revoke(self, role, user=None, group=None, domain=None, project=None):
|
||||
"""Revokes a role from a user or group on a domain or project."""
|
||||
self._require_domain_xor_project(domain, project)
|
||||
self._require_user_xor_group(user, group)
|
||||
|
||||
return super(RoleManager, self).delete(
|
||||
base_url=self._role_grants_base_url(user, domain, project),
|
||||
base_url=self._role_grants_base_url(user, group, domain, project),
|
||||
role_id=base.getid(role))
|
||||
|
@@ -33,6 +33,11 @@ class UserManager(base.CrudManager):
|
||||
collection_key = 'users'
|
||||
key = 'user'
|
||||
|
||||
def _require_user_and_group(self, user, group):
|
||||
if not (user and group):
|
||||
msg = 'Specify both a user and a group'
|
||||
raise exceptions.ValidationError(msg)
|
||||
|
||||
def create(self, name, domain=None, project=None, password=None,
|
||||
email=None, description=None, enabled=True):
|
||||
return super(UserManager, self).create(
|
||||
@@ -44,8 +49,14 @@ class UserManager(base.CrudManager):
|
||||
description=description,
|
||||
enabled=enabled)
|
||||
|
||||
def list(self, project=None, domain=None):
|
||||
def list(self, project=None, domain=None, group=None):
|
||||
if group:
|
||||
base_url = '/groups/%s' % base.getid(group)
|
||||
else:
|
||||
base_url = None
|
||||
|
||||
return super(UserManager, self).list(
|
||||
base_url=base_url,
|
||||
domain_id=base.getid(domain),
|
||||
project_id=base.getid(project))
|
||||
|
||||
@@ -65,6 +76,30 @@ class UserManager(base.CrudManager):
|
||||
description=description,
|
||||
enabled=enabled)
|
||||
|
||||
def add_to_group(self, user, group):
|
||||
self._require_user_and_group(user, group)
|
||||
|
||||
base_url = '/groups/%s' % base.getid(group)
|
||||
return super(UserManager, self).put(
|
||||
base_url=base_url,
|
||||
user_id=base.getid(user))
|
||||
|
||||
def check_in_group(self, user, group):
|
||||
self._require_user_and_group(user, group)
|
||||
|
||||
base_url = '/groups/%s' % base.getid(group)
|
||||
return super(UserManager, self).head(
|
||||
base_url=base_url,
|
||||
user_id=base.getid(user))
|
||||
|
||||
def remove_from_group(self, user, group):
|
||||
self._require_user_and_group(user, group)
|
||||
|
||||
base_url = '/groups/%s' % base.getid(group)
|
||||
return super(UserManager, self).delete(
|
||||
base_url=base_url,
|
||||
user_id=base.getid(user))
|
||||
|
||||
def delete(self, user):
|
||||
return super(UserManager, self).delete(
|
||||
user_id=base.getid(user))
|
||||
|
63
tests/v3/test_groups.py
Normal file
63
tests/v3/test_groups.py
Normal file
@@ -0,0 +1,63 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 OpenStack LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import urlparse
|
||||
import uuid
|
||||
|
||||
import requests
|
||||
|
||||
from keystoneclient.v3 import groups
|
||||
from tests.v3 import utils
|
||||
|
||||
|
||||
class GroupTests(utils.TestCase, utils.CrudTests):
|
||||
def setUp(self):
|
||||
super(GroupTests, self).setUp()
|
||||
self.additionalSetUp()
|
||||
self.key = 'group'
|
||||
self.collection_key = 'groups'
|
||||
self.model = groups.Group
|
||||
self.manager = self.client.groups
|
||||
|
||||
def new_ref(self, **kwargs):
|
||||
kwargs = super(GroupTests, self).new_ref(**kwargs)
|
||||
kwargs.setdefault('name', uuid.uuid4().hex)
|
||||
return kwargs
|
||||
|
||||
def test_list_groups_for_user(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
ref_list = [self.new_ref(), self.new_ref()]
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 200,
|
||||
"text": self.serialize(ref_list),
|
||||
})
|
||||
|
||||
method = 'GET'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/users/%s/%s' % (
|
||||
user_id, self.collection_key)),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
returned_list = self.manager.list(user=user_id)
|
||||
self.assertTrue(len(returned_list))
|
||||
[self.assertTrue(isinstance(r, self.model)) for r in returned_list]
|
@@ -1,3 +1,19 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 OpenStack LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import urlparse
|
||||
import uuid
|
||||
@@ -46,6 +62,29 @@ class RoleTests(utils.TestCase, utils.CrudTests):
|
||||
|
||||
self.manager.grant(role=ref['id'], domain=domain_id, user=user_id)
|
||||
|
||||
def test_domain_group_role_grant(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
domain_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 201,
|
||||
"text": '',
|
||||
})
|
||||
|
||||
method = 'PUT'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/domains/%s/groups/%s/%s/%s' % (
|
||||
domain_id, group_id, self.collection_key, ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.grant(role=ref['id'], domain=domain_id, group=group_id)
|
||||
|
||||
def test_domain_role_list(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
domain_id = uuid.uuid4().hex
|
||||
@@ -69,12 +108,35 @@ class RoleTests(utils.TestCase, utils.CrudTests):
|
||||
|
||||
self.manager.list(domain=domain_id, user=user_id)
|
||||
|
||||
def test_domain_group_role_list(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
domain_id = uuid.uuid4().hex
|
||||
ref_list = [self.new_ref(), self.new_ref()]
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 200,
|
||||
"text": self.serialize(ref_list),
|
||||
})
|
||||
|
||||
method = 'GET'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/domains/%s/groups/%s/%s' % (
|
||||
domain_id, group_id, self.collection_key)),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.list(domain=domain_id, group=group_id)
|
||||
|
||||
def test_domain_role_check(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
domain_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 200,
|
||||
"status_code": 204,
|
||||
"text": '',
|
||||
})
|
||||
|
||||
@@ -90,7 +152,32 @@ class RoleTests(utils.TestCase, utils.CrudTests):
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.check(role=ref['id'], domain=domain_id, user=user_id)
|
||||
self.manager.check(role=ref['id'], domain=domain_id,
|
||||
user=user_id)
|
||||
|
||||
def test_domain_group_role_check(self):
|
||||
return
|
||||
group_id = uuid.uuid4().hex
|
||||
domain_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 204,
|
||||
"text": '',
|
||||
})
|
||||
|
||||
method = 'HEAD'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/domains/%s/groups/%s/%s/%s' % (
|
||||
domain_id, group_id, self.collection_key, ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.check(role=ref['id'], domain=domain_id, group=group_id)
|
||||
|
||||
def test_domain_role_revoke(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
@@ -115,6 +202,29 @@ class RoleTests(utils.TestCase, utils.CrudTests):
|
||||
|
||||
self.manager.revoke(role=ref['id'], domain=domain_id, user=user_id)
|
||||
|
||||
def test_domain_group_role_revoke(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
domain_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 204,
|
||||
"text": '',
|
||||
})
|
||||
|
||||
method = 'DELETE'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/domains/%s/groups/%s/%s/%s' % (
|
||||
domain_id, group_id, self.collection_key, ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.revoke(role=ref['id'], domain=domain_id, group=group_id)
|
||||
|
||||
def test_project_role_grant(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
@@ -138,6 +248,29 @@ class RoleTests(utils.TestCase, utils.CrudTests):
|
||||
|
||||
self.manager.grant(role=ref['id'], project=project_id, user=user_id)
|
||||
|
||||
def test_project_group_role_grant(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 201,
|
||||
"text": '',
|
||||
})
|
||||
|
||||
method = 'PUT'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/projects/%s/groups/%s/%s/%s' % (
|
||||
project_id, group_id, self.collection_key, ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.grant(role=ref['id'], project=project_id, group=group_id)
|
||||
|
||||
def test_project_role_list(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
@@ -161,6 +294,29 @@ class RoleTests(utils.TestCase, utils.CrudTests):
|
||||
|
||||
self.manager.list(project=project_id, user=user_id)
|
||||
|
||||
def test_project_group_role_list(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
ref_list = [self.new_ref(), self.new_ref()]
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 200,
|
||||
"text": self.serialize(ref_list),
|
||||
})
|
||||
|
||||
method = 'GET'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/projects/%s/groups/%s/%s' % (
|
||||
project_id, group_id, self.collection_key)),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.list(project=project_id, group=group_id)
|
||||
|
||||
def test_project_role_check(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
@@ -184,6 +340,29 @@ class RoleTests(utils.TestCase, utils.CrudTests):
|
||||
|
||||
self.manager.check(role=ref['id'], project=project_id, user=user_id)
|
||||
|
||||
def test_project_group_role_check(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 200,
|
||||
"text": '',
|
||||
})
|
||||
|
||||
method = 'HEAD'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/projects/%s/groups/%s/%s/%s' % (
|
||||
project_id, group_id, self.collection_key, ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.check(role=ref['id'], project=project_id, group=group_id)
|
||||
|
||||
def test_project_role_revoke(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
@@ -207,6 +386,29 @@ class RoleTests(utils.TestCase, utils.CrudTests):
|
||||
|
||||
self.manager.revoke(role=ref['id'], project=project_id, user=user_id)
|
||||
|
||||
def test_project_group_role_revoke(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 204,
|
||||
"text": '',
|
||||
})
|
||||
|
||||
method = 'DELETE'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/projects/%s/groups/%s/%s/%s' % (
|
||||
project_id, group_id, self.collection_key, ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.revoke(role=ref['id'], project=project_id, group=group_id)
|
||||
|
||||
def test_domain_project_role_grant_fails(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
@@ -260,3 +462,57 @@ class RoleTests(utils.TestCase, utils.CrudTests):
|
||||
domain=domain_id,
|
||||
project=project_id,
|
||||
user=user_id)
|
||||
|
||||
def test_user_group_role_grant_fails(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
group_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.ValidationError,
|
||||
self.manager.grant,
|
||||
role=ref['id'],
|
||||
project=project_id,
|
||||
group=group_id,
|
||||
user=user_id)
|
||||
|
||||
def test_user_group_role_list_fails(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
group_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.ValidationError,
|
||||
self.manager.list,
|
||||
project=project_id,
|
||||
group=group_id,
|
||||
user=user_id)
|
||||
|
||||
def test_user_group_role_check_fails(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
group_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.ValidationError,
|
||||
self.manager.check,
|
||||
role=ref['id'],
|
||||
project=project_id,
|
||||
group=group_id,
|
||||
user=user_id)
|
||||
|
||||
def test_user_group_role_revoke_fails(self):
|
||||
user_id = uuid.uuid4().hex
|
||||
group_id = uuid.uuid4().hex
|
||||
project_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.ValidationError,
|
||||
self.manager.revoke,
|
||||
role=ref['id'],
|
||||
project=project_id,
|
||||
group=group_id,
|
||||
user=user_id)
|
||||
|
@@ -1,5 +1,25 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 OpenStack LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import urlparse
|
||||
import uuid
|
||||
|
||||
import requests
|
||||
|
||||
from keystoneclient.v3 import users
|
||||
from tests.v3 import utils
|
||||
|
||||
@@ -21,3 +41,93 @@ class UserTests(utils.TestCase, utils.CrudTests):
|
||||
kwargs.setdefault('name', uuid.uuid4().hex)
|
||||
kwargs.setdefault('project_id', uuid.uuid4().hex)
|
||||
return kwargs
|
||||
|
||||
def test_add_user_to_group(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 204,
|
||||
"text": '',
|
||||
})
|
||||
|
||||
method = 'PUT'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/groups/%s/%s/%s' % (
|
||||
group_id, self.collection_key, ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.add_to_group(user=ref['id'], group=group_id)
|
||||
|
||||
def test_list_users_in_group(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
ref_list = [self.new_ref(), self.new_ref()]
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 200,
|
||||
"text": self.serialize(ref_list),
|
||||
})
|
||||
|
||||
method = 'GET'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/groups/%s/%s' % (
|
||||
group_id, self.collection_key)),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
returned_list = self.manager.list(group=group_id)
|
||||
self.assertTrue(len(returned_list))
|
||||
[self.assertTrue(isinstance(r, self.model)) for r in returned_list]
|
||||
|
||||
def test_check_user_in_group(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 204,
|
||||
"text": '',
|
||||
})
|
||||
|
||||
method = 'HEAD'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/groups/%s/%s/%s' % (
|
||||
group_id, self.collection_key, ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.check_in_group(user=ref['id'], group=group_id)
|
||||
|
||||
def test_remove_user_from_group(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
ref = self.new_ref()
|
||||
resp = utils.TestResponse({
|
||||
"status_code": 204,
|
||||
"text": '',
|
||||
})
|
||||
|
||||
method = 'DELETE'
|
||||
kwargs = copy.copy(self.TEST_REQUEST_BASE)
|
||||
kwargs['headers'] = self.headers[method]
|
||||
requests.request(
|
||||
method,
|
||||
urlparse.urljoin(
|
||||
self.TEST_URL,
|
||||
'v3/groups/%s/%s/%s' % (
|
||||
group_id, self.collection_key, ref['id'])),
|
||||
**kwargs).AndReturn((resp))
|
||||
self.mox.ReplayAll()
|
||||
|
||||
self.manager.remove_from_group(user=ref['id'], group=group_id)
|
||||
|
Reference in New Issue
Block a user