2012-01-06 21:18:51 -08:00
|
|
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
2012-01-23 16:35:41 -08:00
|
|
|
import nose.exc
|
|
|
|
|
2012-01-09 10:28:55 -08:00
|
|
|
from keystone import config
|
|
|
|
from keystone import test
|
2011-10-26 17:01:11 -07:00
|
|
|
|
2011-12-28 13:58:49 -08:00
|
|
|
import default_fixtures
|
|
|
|
|
2012-01-06 21:00:41 -08:00
|
|
|
CONF = config.CONF
|
2011-12-22 13:09:43 -08:00
|
|
|
KEYSTONECLIENT_REPO = 'git://github.com/openstack/python-keystoneclient.git'
|
2011-10-26 17:01:11 -07:00
|
|
|
|
|
|
|
|
|
|
|
class CompatTestCase(test.TestCase):
|
2011-12-21 16:24:48 -08:00
|
|
|
def setUp(self):
|
|
|
|
super(CompatTestCase, self).setUp()
|
2011-10-26 17:01:11 -07:00
|
|
|
|
2011-12-29 17:40:32 -08:00
|
|
|
def _public_url(self):
|
|
|
|
public_port = self.public_server.socket_info['socket'][1]
|
2012-01-06 21:00:41 -08:00
|
|
|
CONF.public_port = public_port
|
2011-12-29 17:40:32 -08:00
|
|
|
return "http://localhost:%s/v2.0" % public_port
|
|
|
|
|
|
|
|
def _admin_url(self):
|
|
|
|
admin_port = self.admin_server.socket_info['socket'][1]
|
2012-01-06 21:00:41 -08:00
|
|
|
CONF.admin_port = admin_port
|
2011-12-29 17:40:32 -08:00
|
|
|
return "http://localhost:%s/v2.0" % admin_port
|
2011-12-22 13:09:43 -08:00
|
|
|
|
|
|
|
def _client(self, **kwargs):
|
|
|
|
from keystoneclient.v2_0 import client as ks_client
|
|
|
|
|
2011-12-29 17:40:32 -08:00
|
|
|
kc = ks_client.Client(endpoint=self._admin_url(),
|
|
|
|
auth_url=self._public_url(),
|
|
|
|
**kwargs)
|
2011-12-22 13:09:43 -08:00
|
|
|
kc.authenticate()
|
2011-12-29 17:40:32 -08:00
|
|
|
# have to manually overwrite the management url after authentication
|
|
|
|
kc.management_url = self._admin_url()
|
2011-12-22 13:09:43 -08:00
|
|
|
return kc
|
|
|
|
|
2011-10-26 17:01:11 -07:00
|
|
|
|
2012-01-06 21:31:12 -08:00
|
|
|
class KcMasterTestCase(CompatTestCase):
|
2011-12-21 16:24:48 -08:00
|
|
|
def setUp(self):
|
2012-01-06 21:31:12 -08:00
|
|
|
super(KcMasterTestCase, self).setUp()
|
2011-12-21 16:24:48 -08:00
|
|
|
|
|
|
|
revdir = test.checkout_vendor(KEYSTONECLIENT_REPO, 'master')
|
|
|
|
self.add_path(revdir)
|
|
|
|
from keystoneclient.v2_0 import client as ks_client
|
|
|
|
reload(ks_client)
|
|
|
|
|
2012-01-09 15:52:48 -08:00
|
|
|
self.public_app = self.loadapp('keystone', name='main')
|
|
|
|
self.admin_app = self.loadapp('keystone', name='admin')
|
2011-12-29 17:40:32 -08:00
|
|
|
|
2011-12-28 13:58:49 -08:00
|
|
|
self.load_backends()
|
|
|
|
self.load_fixtures(default_fixtures)
|
2011-12-21 16:24:48 -08:00
|
|
|
|
2012-01-09 15:52:48 -08:00
|
|
|
self.public_server = self.serveapp('keystone', name='main')
|
|
|
|
self.admin_server = self.serveapp('keystone', name='admin')
|
2011-12-21 16:24:48 -08:00
|
|
|
|
2011-12-28 13:58:49 -08:00
|
|
|
# TODO(termie): is_admin is being deprecated once the policy stuff
|
|
|
|
# is all working
|
|
|
|
# TODO(termie): add an admin user to the fixtures and use that user
|
|
|
|
# override the fixtures, for now
|
2012-01-09 13:45:41 -08:00
|
|
|
self.metadata_foobar = self.identity_api.update_metadata(
|
2011-12-21 16:24:48 -08:00
|
|
|
self.user_foo['id'], self.tenant_bar['id'],
|
2011-12-28 13:58:49 -08:00
|
|
|
dict(roles=['keystone_admin'], is_admin='1'))
|
2011-12-21 16:24:48 -08:00
|
|
|
|
2012-01-19 15:48:28 -08:00
|
|
|
def get_client(self, user_ref=None, tenant_ref=None):
|
2012-01-19 15:44:48 -08:00
|
|
|
if user_ref is None:
|
|
|
|
user_ref = self.user_foo
|
2012-01-19 15:48:28 -08:00
|
|
|
if tenant_ref is None:
|
|
|
|
for user in default_fixtures.USERS:
|
|
|
|
if user['id'] == user_ref['id']:
|
|
|
|
tenant_id = user['tenants'][0]
|
|
|
|
else:
|
|
|
|
tenant_id = tenant_ref['id']
|
2012-01-17 21:03:27 -08:00
|
|
|
|
2012-01-19 15:44:48 -08:00
|
|
|
return self._client(username=user_ref['name'],
|
|
|
|
password=user_ref['password'],
|
2012-01-19 15:48:28 -08:00
|
|
|
tenant_id=tenant_id)
|
2011-12-21 16:24:48 -08:00
|
|
|
|
2011-12-30 16:14:36 -08:00
|
|
|
def test_authenticate_tenant_name_and_tenants(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2011-12-21 16:24:48 -08:00
|
|
|
tenants = client.tenants.list()
|
|
|
|
self.assertEquals(tenants[0].id, self.tenant_bar['id'])
|
|
|
|
|
|
|
|
def test_authenticate_tenant_id_and_tenants(self):
|
2012-01-24 00:56:53 -08:00
|
|
|
client = self._client(username=self.user_foo['name'],
|
|
|
|
password=self.user_foo['password'],
|
2012-01-12 12:45:08 -08:00
|
|
|
tenant_id='bar')
|
|
|
|
tenants = client.tenants.list()
|
|
|
|
self.assertEquals(tenants[0].id, self.tenant_bar['id'])
|
|
|
|
|
|
|
|
def test_authenticate_token_no_tenant(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2012-01-12 12:45:08 -08:00
|
|
|
token = client.auth_token
|
|
|
|
token_client = self._client(token=token)
|
2012-01-19 16:25:26 -08:00
|
|
|
tenants = token_client.tenants.list()
|
2012-01-12 12:45:08 -08:00
|
|
|
self.assertEquals(tenants[0].id, self.tenant_bar['id'])
|
|
|
|
|
|
|
|
def test_authenticate_token_tenant_id(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2012-01-12 12:45:08 -08:00
|
|
|
token = client.auth_token
|
|
|
|
token_client = self._client(token=token, tenant_id='bar')
|
2012-01-19 16:25:26 -08:00
|
|
|
tenants = token_client.tenants.list()
|
2012-01-12 12:45:08 -08:00
|
|
|
self.assertEquals(tenants[0].id, self.tenant_bar['id'])
|
|
|
|
|
|
|
|
def test_authenticate_token_tenant_name(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2012-01-12 12:45:08 -08:00
|
|
|
token = client.auth_token
|
|
|
|
token_client = self._client(token=token, tenant_name='BAR')
|
2012-01-19 16:25:26 -08:00
|
|
|
tenants = token_client.tenants.list()
|
2011-12-21 16:24:48 -08:00
|
|
|
self.assertEquals(tenants[0].id, self.tenant_bar['id'])
|
2011-12-21 20:52:10 -08:00
|
|
|
|
2012-01-04 14:52:17 -08:00
|
|
|
# TODO(termie): I'm not really sure that this is testing much
|
2011-12-30 16:14:36 -08:00
|
|
|
def test_endpoints(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2012-01-04 14:52:17 -08:00
|
|
|
token = client.auth_token
|
|
|
|
endpoints = client.tokens.endpoints(token)
|
2011-12-30 16:14:36 -08:00
|
|
|
|
2011-12-21 20:52:10 -08:00
|
|
|
# FIXME(ja): this test should require the "keystone:admin" roled
|
|
|
|
# (probably the role set via --keystone_admin_role flag)
|
|
|
|
# FIXME(ja): add a test that admin endpoint is only sent to admin user
|
2011-12-28 13:58:49 -08:00
|
|
|
# FIXME(ja): add a test that admin endpoint returns unauthorized if not
|
|
|
|
# admin
|
2011-12-30 16:14:36 -08:00
|
|
|
def test_tenant_create_update_and_delete(self):
|
|
|
|
from keystoneclient import exceptions as client_exceptions
|
|
|
|
|
|
|
|
test_tenant = 'new_tenant'
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2011-12-30 16:14:36 -08:00
|
|
|
tenant = client.tenants.create(test_tenant,
|
|
|
|
description="My new tenant!",
|
|
|
|
enabled=True)
|
|
|
|
self.assertEquals(tenant.name, test_tenant)
|
|
|
|
|
|
|
|
tenant = client.tenants.get(tenant.id)
|
|
|
|
self.assertEquals(tenant.name, test_tenant)
|
|
|
|
|
|
|
|
# TODO(devcamcar): update gives 404. why?
|
|
|
|
tenant = client.tenants.update(tenant.id,
|
|
|
|
tenant_name='new_tenant2',
|
|
|
|
enabled=False,
|
|
|
|
description='new description')
|
|
|
|
self.assertEquals(tenant.name, 'new_tenant2')
|
|
|
|
self.assertFalse(tenant.enabled)
|
|
|
|
self.assertEquals(tenant.description, 'new description')
|
|
|
|
|
2012-01-04 11:05:58 -08:00
|
|
|
client.tenants.delete(tenant.id)
|
2011-12-30 16:14:36 -08:00
|
|
|
self.assertRaises(client_exceptions.NotFound, client.tenants.get,
|
|
|
|
tenant.id)
|
|
|
|
|
|
|
|
def test_tenant_list(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2011-12-30 16:14:36 -08:00
|
|
|
tenants = client.tenants.list()
|
|
|
|
self.assertEquals(len(tenants), 1)
|
|
|
|
|
2012-01-04 14:33:43 -08:00
|
|
|
def test_tenant_add_and_remove_user(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2012-01-04 14:33:43 -08:00
|
|
|
client.roles.add_user_to_tenant(self.tenant_baz['id'],
|
|
|
|
self.user_foo['id'],
|
|
|
|
self.role_useless['id'])
|
|
|
|
tenant_refs = client.tenants.list()
|
|
|
|
self.assert_(self.tenant_baz['id'] in
|
|
|
|
[x.id for x in tenant_refs])
|
|
|
|
|
|
|
|
# get the "role_refs" so we get the proper id, this is how the clients
|
|
|
|
# do it
|
|
|
|
roleref_refs = client.roles.get_user_role_refs(self.user_foo['id'])
|
|
|
|
for roleref_ref in roleref_refs:
|
|
|
|
if (roleref_ref.roleId == self.role_useless['id'] and
|
|
|
|
roleref_ref.tenantId == self.tenant_baz['id']):
|
|
|
|
# use python's scope fall through to leave roleref_ref set
|
|
|
|
break
|
|
|
|
|
|
|
|
client.roles.remove_user_from_tenant(self.tenant_baz['id'],
|
|
|
|
self.user_foo['id'],
|
|
|
|
roleref_ref.id)
|
|
|
|
|
|
|
|
tenant_refs = client.tenants.list()
|
|
|
|
self.assert_(self.tenant_baz['id'] not in
|
|
|
|
[x.id for x in tenant_refs])
|
2011-12-30 16:14:36 -08:00
|
|
|
|
2012-01-19 17:15:03 -08:00
|
|
|
def test_invalid_password(self):
|
|
|
|
from keystoneclient import exceptions as client_exceptions
|
|
|
|
|
|
|
|
good_client = self._client(username=self.user_foo['name'],
|
|
|
|
password=self.user_foo['password'])
|
|
|
|
good_client.tenants.list()
|
|
|
|
|
2012-01-23 16:35:41 -08:00
|
|
|
self.assertRaises(client_exceptions.AuthorizationFailure,
|
2012-01-19 17:15:03 -08:00
|
|
|
self._client,
|
|
|
|
username=self.user_foo['name'],
|
|
|
|
password='invalid')
|
|
|
|
|
2011-12-30 16:14:36 -08:00
|
|
|
def test_user_create_update_delete(self):
|
|
|
|
from keystoneclient import exceptions as client_exceptions
|
|
|
|
|
2012-01-19 17:15:03 -08:00
|
|
|
test_username = 'new_user'
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2012-01-19 17:15:03 -08:00
|
|
|
user = client.users.create(test_username, 'password', 'user1@test.com')
|
|
|
|
self.assertEquals(user.name, test_username)
|
2011-12-30 16:14:36 -08:00
|
|
|
|
|
|
|
user = client.users.get(user.id)
|
2012-01-19 17:15:03 -08:00
|
|
|
self.assertEquals(user.name, test_username)
|
2011-12-30 16:14:36 -08:00
|
|
|
|
|
|
|
user = client.users.update_email(user, 'user2@test.com')
|
|
|
|
self.assertEquals(user.email, 'user2@test.com')
|
|
|
|
|
2012-01-04 15:47:52 -08:00
|
|
|
# NOTE(termie): update_enabled doesn't return anything, probably a bug
|
|
|
|
client.users.update_enabled(user, False)
|
|
|
|
user = client.users.get(user.id)
|
2011-12-30 16:14:36 -08:00
|
|
|
self.assertFalse(user.enabled)
|
|
|
|
|
2012-01-23 14:53:37 -08:00
|
|
|
self.assertRaises(client_exceptions.AuthorizationFailure,
|
2012-01-19 17:15:03 -08:00
|
|
|
self._client,
|
|
|
|
username=test_username,
|
|
|
|
password='password')
|
|
|
|
client.users.update_enabled(user, True)
|
|
|
|
|
2011-12-30 16:14:36 -08:00
|
|
|
user = client.users.update_password(user, 'password2')
|
|
|
|
|
2012-01-19 17:15:03 -08:00
|
|
|
test_client = self._client(username=test_username,
|
|
|
|
password='password2')
|
|
|
|
|
2011-12-30 16:14:36 -08:00
|
|
|
user = client.users.update_tenant(user, 'bar')
|
2012-01-19 17:15:03 -08:00
|
|
|
# TODO(ja): once keystonelight supports default tenant
|
|
|
|
# when you login without specifying tenant, the
|
|
|
|
# token should be scoped to tenant 'bar'
|
2011-12-30 16:14:36 -08:00
|
|
|
|
|
|
|
client.users.delete(user.id)
|
|
|
|
self.assertRaises(client_exceptions.NotFound, client.users.get,
|
|
|
|
user.id)
|
|
|
|
|
|
|
|
def test_user_list(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2011-12-30 16:14:36 -08:00
|
|
|
users = client.users.list()
|
|
|
|
self.assertTrue(len(users) > 0)
|
2012-01-19 18:37:30 -08:00
|
|
|
user = users[0]
|
2012-01-23 14:30:22 -08:00
|
|
|
self.assertRaises(AttributeError, lambda: user.password)
|
2012-01-19 18:37:30 -08:00
|
|
|
|
|
|
|
def test_user_get(self):
|
|
|
|
client = self.get_client()
|
|
|
|
user = client.users.get(self.user_foo['id'])
|
2012-01-23 14:30:22 -08:00
|
|
|
self.assertRaises(AttributeError, lambda: user.password)
|
2011-12-30 16:14:36 -08:00
|
|
|
|
|
|
|
def test_role_get(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2011-12-30 16:14:36 -08:00
|
|
|
role = client.roles.get('keystone_admin')
|
2012-01-04 17:31:54 -08:00
|
|
|
self.assertEquals(role.id, 'keystone_admin')
|
2011-12-30 16:14:36 -08:00
|
|
|
|
|
|
|
def test_role_create_and_delete(self):
|
|
|
|
from keystoneclient import exceptions as client_exceptions
|
|
|
|
|
|
|
|
test_role = 'new_role'
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2011-12-30 16:14:36 -08:00
|
|
|
role = client.roles.create(test_role)
|
|
|
|
self.assertEquals(role.name, test_role)
|
|
|
|
|
2012-01-04 17:31:54 -08:00
|
|
|
role = client.roles.get(role)
|
2011-12-30 16:14:36 -08:00
|
|
|
self.assertEquals(role.name, test_role)
|
|
|
|
|
2012-01-04 17:31:54 -08:00
|
|
|
client.roles.delete(role)
|
2011-12-30 16:14:36 -08:00
|
|
|
|
|
|
|
self.assertRaises(client_exceptions.NotFound, client.roles.get,
|
|
|
|
test_role)
|
|
|
|
|
|
|
|
def test_role_list(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2011-12-30 16:14:36 -08:00
|
|
|
roles = client.roles.list()
|
|
|
|
# TODO(devcamcar): This assert should be more specific.
|
|
|
|
self.assertTrue(len(roles) > 0)
|
|
|
|
|
|
|
|
def test_roles_get_by_user(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2012-01-04 17:31:54 -08:00
|
|
|
roles = client.roles.get_user_role_refs('foo')
|
2011-12-30 16:14:36 -08:00
|
|
|
self.assertTrue(len(roles) > 0)
|
|
|
|
|
2012-01-17 21:03:27 -08:00
|
|
|
def test_ec2_credential_crud(self):
|
|
|
|
client = self.get_client()
|
2012-01-16 17:36:38 -08:00
|
|
|
creds = client.ec2.list(self.user_foo['id'])
|
|
|
|
self.assertEquals(creds, [])
|
|
|
|
|
|
|
|
cred = client.ec2.create(self.user_foo['id'], self.tenant_bar['id'])
|
|
|
|
creds = client.ec2.list(self.user_foo['id'])
|
|
|
|
self.assertEquals(creds, [cred])
|
|
|
|
|
|
|
|
got = client.ec2.get(self.user_foo['id'], cred.access)
|
|
|
|
self.assertEquals(cred, got)
|
|
|
|
|
|
|
|
# FIXME(ja): need to test ec2 validation here
|
|
|
|
|
|
|
|
client.ec2.delete(self.user_foo['id'], cred.access)
|
|
|
|
creds = client.ec2.list(self.user_foo['id'])
|
|
|
|
self.assertEquals(creds, [])
|
|
|
|
|
2012-01-18 11:23:43 -08:00
|
|
|
def test_ec2_credentials_list_unauthorized_user(self):
|
2012-01-23 16:35:41 -08:00
|
|
|
raise nose.exc.SkipTest('TODO')
|
2012-01-17 21:03:27 -08:00
|
|
|
from keystoneclient import exceptions as client_exceptions
|
|
|
|
|
2012-01-19 15:44:48 -08:00
|
|
|
two = self.get_client(self.user_two)
|
|
|
|
self.assertRaises(client_exceptions.Unauthorized, two.ec2.list,
|
2012-01-17 21:03:27 -08:00
|
|
|
self.user_foo['id'])
|
|
|
|
|
2012-01-18 11:23:43 -08:00
|
|
|
def test_ec2_credentials_get_unauthorized_user(self):
|
2012-01-23 16:35:41 -08:00
|
|
|
raise nose.exc.SkipTest('TODO')
|
2012-01-17 21:48:31 -08:00
|
|
|
from keystoneclient import exceptions as client_exceptions
|
|
|
|
|
|
|
|
foo = self.get_client()
|
|
|
|
cred = foo.ec2.create(self.user_foo['id'], self.tenant_bar['id'])
|
|
|
|
|
2012-01-19 15:44:48 -08:00
|
|
|
two = self.get_client(self.user_two)
|
|
|
|
self.assertRaises(client_exceptions.Unauthorized, two.ec2.get,
|
2012-01-17 21:48:31 -08:00
|
|
|
self.user_foo['id'], cred.access)
|
2012-01-19 15:44:48 -08:00
|
|
|
|
2012-01-17 21:48:31 -08:00
|
|
|
foo.ec2.delete(self.user_foo['id'], cred.access)
|
|
|
|
|
2012-01-18 11:23:43 -08:00
|
|
|
def test_ec2_credentials_delete_unauthorized_user(self):
|
2012-01-23 16:35:41 -08:00
|
|
|
raise nose.exc.SkipTest('TODO')
|
2012-01-17 21:48:31 -08:00
|
|
|
from keystoneclient import exceptions as client_exceptions
|
|
|
|
|
|
|
|
foo = self.get_client()
|
|
|
|
cred = foo.ec2.create(self.user_foo['id'], self.tenant_bar['id'])
|
|
|
|
|
2012-01-19 15:44:48 -08:00
|
|
|
two = self.get_client(self.user_two)
|
|
|
|
self.assertRaises(client_exceptions.Unauthorized, two.ec2.delete,
|
2012-01-17 21:48:31 -08:00
|
|
|
self.user_foo['id'], cred.access)
|
2012-01-19 16:25:26 -08:00
|
|
|
|
2012-01-17 21:48:31 -08:00
|
|
|
foo.ec2.delete(self.user_foo['id'], cred.access)
|
|
|
|
|
2011-12-30 16:14:36 -08:00
|
|
|
def test_service_create_and_delete(self):
|
|
|
|
from keystoneclient import exceptions as client_exceptions
|
|
|
|
|
|
|
|
test_service = 'new_service'
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2011-12-30 16:14:36 -08:00
|
|
|
service = client.services.create(test_service, 'test', 'test')
|
|
|
|
self.assertEquals(service.name, test_service)
|
|
|
|
|
|
|
|
service = client.services.get(service.id)
|
|
|
|
self.assertEquals(service.name, test_service)
|
|
|
|
|
|
|
|
client.services.delete(service.id)
|
|
|
|
self.assertRaises(client_exceptions.NotFound, client.services.get,
|
|
|
|
service.id)
|
|
|
|
|
|
|
|
def test_service_list(self):
|
2012-01-17 21:03:27 -08:00
|
|
|
client = self.get_client()
|
2012-01-04 18:23:07 -08:00
|
|
|
test_service = 'new_service'
|
|
|
|
service = client.services.create(test_service, 'test', 'test')
|
2011-12-30 16:14:36 -08:00
|
|
|
services = client.services.list()
|
|
|
|
# TODO(devcamcar): This assert should be more specific.
|
|
|
|
self.assertTrue(len(services) > 0)
|