fix keystoneclient tests

This commit is contained in:
termie 2012-01-31 21:31:36 -08:00
parent c6e30eb5a1
commit 6fd68e1a38
4 changed files with 145 additions and 64 deletions

View File

@ -110,7 +110,7 @@ class CrudExtension(wsgi.ExtensionRouter):
conditions=dict(method=["PUT"]))
mapper.connect(
"/tenants/{tenant_id}/users/{user_id}/roles/OS-KSADM/{role_id}",
controller=role_controller, action="delete_role_from_user",
controller=role_controller, action="remove_role_from_user",
conditions=dict(method=["DELETE"]))
# Service Operations

View File

@ -366,8 +366,20 @@ class RoleController(wsgi.Application):
self.policy_api = policy.Manager()
super(RoleController, self).__init__()
# COMPAT(essex-3)
def get_user_roles(self, context, user_id, tenant_id=None):
raise NotImplemented()
"""Get the roles for a user and tenant pair.
Since we're trying to ignore the idea of user-only roles we're
not implementing them in hopes that the idea will die off.
"""
if tenant_id is None:
raise Exception('User roles not supported: tenant_id required')
roles = self.identity_api.get_roles_for_user_and_tenant(
context, user_id, tenant_id)
return {'roles': [self.identity_api.get_role(context, x)
for x in roles]}
# CRUD extension
def get_role(self, context, role_id):
@ -395,6 +407,47 @@ class RoleController(wsgi.Application):
# TODO(termie): probably inefficient at some point
return {'roles': roles}
def add_role_to_user(self, context, user_id, role_id, tenant_id=None):
"""Add a role to a user and tenant pair.
Since we're trying to ignore the idea of user-only roles we're
not implementing them in hopes that the idea will die off.
"""
self.assert_admin(context)
if tenant_id is None:
raise Exception('User roles not supported: tenant_id required')
# This still has the weird legacy semantics that adding a role to
# a user also adds them to a tenant
self.identity_api.add_user_to_tenant(context, tenant_id, user_id)
self.identity_api.add_role_to_user_and_tenant(
context, user_id, tenant_id, role_id)
role_ref = self.identity_api.get_role(context, role_id)
return {'role': role_ref}
def remove_role_from_user(self, context, user_id, role_id, tenant_id=None):
"""Remove a role from a user and tenant pair.
Since we're trying to ignore the idea of user-only roles we're
not implementing them in hopes that the idea will die off.
"""
self.assert_admin(context)
if tenant_id is None:
raise Exception('User roles not supported: tenant_id required')
# This still has the weird legacy semantics that adding a role to
# a user also adds them to a tenant
self.identity_api.remove_role_from_user_and_tenant(
context, user_id, tenant_id, role_id)
roles = self.identity_api.get_roles_for_user_and_tenant(
context, user_id, tenant_id)
if not roles:
self.identity_api.remove_user_from_tenant(
context, tenant_id, user_id)
return
# COMPAT(diablo): CRUD extension
def get_role_refs(self, context, user_id):
"""Ultimate hack to get around having to make role_refs first-class.
@ -420,6 +473,7 @@ class RoleController(wsgi.Application):
o.append(ref)
return {'roles': o}
# COMPAT(diablo): CRUD extension
def create_role_ref(self, context, user_id, role):
"""This is actually used for adding a user to a tenant.
@ -437,6 +491,7 @@ class RoleController(wsgi.Application):
role_ref = self.identity_api.get_role(context, role_id)
return {'role': role_ref}
# COMPAT(diablo): CRUD extension
def delete_role_ref(self, context, user_id, role_ref_id):
"""This is actually used for deleting a user from a tenant.

View File

@ -90,6 +90,8 @@ class JsonBodyMiddleware(wsgi.Middleware):
return
params_parsed = json.loads(params_json)
if not params_parsed:
params_parsed = {}
params = {}
for k, v in params_parsed.iteritems():
if k in ('self', 'context'):

View File

@ -15,32 +15,6 @@ class CompatTestCase(test.TestCase):
def setUp(self):
super(CompatTestCase, self).setUp()
def _public_url(self):
public_port = self.public_server.socket_info['socket'][1]
CONF.public_port = public_port
return "http://localhost:%s/v2.0" % public_port
def _admin_url(self):
admin_port = self.admin_server.socket_info['socket'][1]
CONF.admin_port = admin_port
return "http://localhost:%s/v2.0" % admin_port
def _client(self, **kwargs):
from keystoneclient.v2_0 import client as ks_client
kc = ks_client.Client(endpoint=self._admin_url(),
auth_url=self._public_url(),
**kwargs)
kc.authenticate()
# have to manually overwrite the management url after authentication
kc.management_url = self._admin_url()
return kc
class KcMasterTestCase(CompatTestCase):
def setUp(self):
super(KcMasterTestCase, self).setUp()
revdir = test.checkout_vendor(*self.get_checkout())
self.add_path(revdir)
self.clear_module('keystoneclient')
@ -62,8 +36,26 @@ class KcMasterTestCase(CompatTestCase):
self.user_foo['id'], self.tenant_bar['id'],
dict(roles=['keystone_admin'], is_admin='1'))
def get_checkout(self):
return KEYSTONECLIENT_REPO, 'master'
def _public_url(self):
public_port = self.public_server.socket_info['socket'][1]
CONF.public_port = public_port
return "http://localhost:%s/v2.0" % public_port
def _admin_url(self):
admin_port = self.admin_server.socket_info['socket'][1]
CONF.admin_port = admin_port
return "http://localhost:%s/v2.0" % admin_port
def _client(self, **kwargs):
from keystoneclient.v2_0 import client as ks_client
kc = ks_client.Client(endpoint=self._admin_url(),
auth_url=self._public_url(),
**kwargs)
kc.authenticate()
# have to manually overwrite the management url after authentication
kc.management_url = self._admin_url()
return kc
def get_client(self, user_ref=None, tenant_ref=None):
if user_ref is None:
@ -79,6 +71,10 @@ class KcMasterTestCase(CompatTestCase):
password=user_ref['password'],
tenant_id=tenant_id)
class KeystoneClientTests(object):
"""Tests for all versions of keystoneclient."""
def test_authenticate_tenant_name_and_tenants(self):
client = self.get_client()
tenants = client.tenants.list()
@ -169,33 +165,6 @@ class KcMasterTestCase(CompatTestCase):
tenants = client.tenants.list()
self.assertEquals(len(tenants), 1)
def test_tenant_add_and_remove_user(self):
client = self.get_client()
client.roles.add_user_to_tenant(tenant_id=self.tenant_baz['id'],
user_id=self.user_foo['id'],
role_id=self.role_useless['id'])
tenant_refs = client.tenants.list()
self.assert_(self.tenant_baz['id'] in
[x.id for x in tenant_refs])
# get the "role_refs" so we get the proper id, this is how the clients
# do it
roleref_refs = client.roles.get_user_role_refs(
user_id=self.user_foo['id'])
for roleref_ref in roleref_refs:
if (roleref_ref.roleId == self.role_useless['id'] and
roleref_ref.tenantId == self.tenant_baz['id']):
# use python's scope fall through to leave roleref_ref set
break
client.roles.remove_user_from_tenant(tenant_id=self.tenant_baz['id'],
user_id=self.user_foo['id'],
role_id=roleref_ref.id)
tenant_refs = client.tenants.list()
self.assert_(self.tenant_baz['id'] not in
[x.id for x in tenant_refs])
def test_invalid_password(self):
from keystoneclient import exceptions as client_exceptions
@ -280,7 +249,7 @@ class KcMasterTestCase(CompatTestCase):
client.roles.delete(role=role.id)
self.assertRaises(client_exceptions.NotFound, client.roles.get,
role=test_role)
role=role.id)
def test_role_list(self):
client = self.get_client()
@ -288,11 +257,6 @@ class KcMasterTestCase(CompatTestCase):
# TODO(devcamcar): This assert should be more specific.
self.assertTrue(len(roles) > 0)
def test_roles_get_by_user(self):
client = self.get_client()
roles = client.roles.get_user_role_refs(user_id='foo')
self.assertTrue(len(roles) > 0)
def test_ec2_credential_crud(self):
client = self.get_client()
creds = client.ec2.list(user_id=self.user_foo['id'])
@ -433,6 +397,66 @@ class KcMasterTestCase(CompatTestCase):
# TODO(ja): determine what else todo
class KcEssex3TestCase(KcMasterTestCase):
class KcMasterTestCase(CompatTestCase, KeystoneClientTests):
def get_checkout(self):
return KEYSTONECLIENT_REPO, 'master'
def test_tenant_add_and_remove_user(self):
client = self.get_client()
client.roles.add_user_role(tenant=self.tenant_baz['id'],
user=self.user_foo['id'],
role=self.role_useless['id'])
tenant_refs = client.tenants.list()
self.assert_(self.tenant_baz['id'] in
[x.id for x in tenant_refs])
client.roles.remove_user_role(tenant=self.tenant_baz['id'],
user=self.user_foo['id'],
role=self.role_useless['id'])
tenant_refs = client.tenants.list()
self.assert_(self.tenant_baz['id'] not in
[x.id for x in tenant_refs])
def test_roles_get_by_user(self):
client = self.get_client()
roles = client.roles.roles_for_user(user=self.user_foo['id'],
tenant=self.tenant_bar['id'])
self.assertTrue(len(roles) > 0)
class KcEssex3TestCase(CompatTestCase, KeystoneClientTests):
def get_checkout(self):
return KEYSTONECLIENT_REPO, 'essex-3'
def test_tenant_add_and_remove_user(self):
client = self.get_client()
client.roles.add_user_to_tenant(tenant_id=self.tenant_baz['id'],
user_id=self.user_foo['id'],
role_id=self.role_useless['id'])
tenant_refs = client.tenants.list()
self.assert_(self.tenant_baz['id'] in
[x.id for x in tenant_refs])
# get the "role_refs" so we get the proper id, this is how the clients
# do it
roleref_refs = client.roles.get_user_role_refs(
user_id=self.user_foo['id'])
for roleref_ref in roleref_refs:
if (roleref_ref.roleId == self.role_useless['id'] and
roleref_ref.tenantId == self.tenant_baz['id']):
# use python's scope fall through to leave roleref_ref set
break
client.roles.remove_user_from_tenant(tenant_id=self.tenant_baz['id'],
user_id=self.user_foo['id'],
role_id=roleref_ref.id)
tenant_refs = client.tenants.list()
self.assert_(self.tenant_baz['id'] not in
[x.id for x in tenant_refs])
def test_roles_get_by_user(self):
client = self.get_client()
roles = client.roles.get_user_role_refs(user_id='foo')
self.assertTrue(len(roles) > 0)