add a bunch of basic tests for the cli

This commit is contained in:
termie 2012-01-24 23:01:51 -08:00
parent 608b9a270b
commit d4f2bf5fde
5 changed files with 249 additions and 40 deletions

View File

@ -1,8 +1,10 @@
from __future__ import absolute_import
import json
import logging
import os
import sys
import StringIO
import textwrap
import cli.app
@ -19,7 +21,7 @@ config.register_cli_str('endpoint',
default='http://localhost:$admin_port/v2.0',
#group='ks',
conf=CONF)
config.register_cli_str('auth_token',
config.register_cli_str('auth-token',
default='$admin_token',
#group='ks',
help='asdasd',
@ -113,7 +115,29 @@ class ClientCommand(BaseApp):
if CONF.id_only and getattr(resp, 'id'):
print resp.id
return
print resp
if resp is None:
return
# NOTE(termie): this is ugly but it is mostly because the keystoneclient
# code doesn't give us very serializable instance objects
if type(resp) in [type(list()), type(tuple())]:
o = []
for r in resp:
d = {}
for k, v in sorted(r.__dict__.iteritems()):
if k[0] == '_' or k == 'manager':
continue
d[k] = v
o.append(d)
else:
o = {}
for k, v in sorted(resp.__dict__.iteritems()):
if k[0] == '_' or k == 'manager':
continue
o[k] = v
print json.dumps(o)
def print_help(self):
CONF.set_usage(CONF.usage.replace(
@ -175,6 +199,122 @@ CMDS = {'db_sync': DbSync,
}
class CommandLineGenerator(object):
"""A keystoneclient lookalike to generate keystone-manage commands.
One would use it like so:
>>> gen = CommandLineGenerator(id_only=None)
>>> cl = gen.ec2.create(user_id='foo', tenant_id='foo')
>>> cl.to_argv()
... ['keystone-manage',
'--id-only',
'ec2',
'create',
'user_id=foo',
'tenant_id=foo']
"""
cmd = 'keystone-manage'
def __init__(self, cmd=None, execute=False, **kw):
if cmd:
self.cmd = cmd
self.flags = kw
self.execute = execute
def __getattr__(self, key):
return _Manager(self, key)
class _Manager(object):
def __init__(self, parent, name):
self.parent = parent
self.name = name
def __getattr__(self, key):
return _CommandLine(cmd=self.parent.cmd,
flags=self.parent.flags,
manager=self.name,
method=key,
execute=self.parent.execute)
class _CommandLine(object):
def __init__(self, cmd, flags, manager, method, execute=False):
self.cmd = cmd
self.flags = flags
self.manager = manager
self.method = method
self.execute = execute
self.kw = {}
def __call__(self, **kw):
self.kw = kw
if self.execute:
logging.debug('generated cli: %s', str(self))
out = StringIO.StringIO()
old_out = sys.stdout
sys.stdout = out
try:
main(self.to_argv())
except SystemExit as e:
pass
finally:
sys.stdout = old_out
rv = out.getvalue().strip().split('\n')[-1]
try:
loaded = json.loads(rv)
if type(loaded) in [type(list()), type(tuple())]:
return [DictWrapper(**x) for x in loaded]
elif type(loaded) is type(dict()):
return DictWrapper(**loaded)
except Exception:
logging.exception('Could not parse JSON: %s', rv)
return rv
return self
def __flags(self):
o = []
for k, v in self.flags.iteritems():
k = k.replace('_', '-')
if v is None:
o.append('--%s' % k)
else:
o.append('--%s=%s' % (k, str(v)))
return o
def __manager(self):
if self.manager.endswith('s'):
return self.manager[:-1]
return self.manager
def __kw(self):
o = []
for k, v in self.kw.iteritems():
o.append('%s=%s' % (k, str(v)))
return o
def to_argv(self):
return ([self.cmd]
+ self.__flags()
+ [self.__manager(), self.method]
+ self.__kw())
def __str__(self):
args = self.to_argv()
return ' '.join(args[:1] + ['"%s"' % x for x in args[1:]])
class DictWrapper(dict):
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(key)
def print_commands(cmds):
print
print "Available commands:"
@ -195,7 +335,9 @@ def run(cmd, args):
def main(argv=None, config_files=None):
CONF.reset()
args = CONF(config_files=config_files, args=argv)
if len(args) < 2:
CONF.print_help()
print_commands(CMDS)

View File

@ -196,7 +196,6 @@ class Application(BaseApplication):
creds = user_token_ref['metadata'].copy()
creds['user_id'] = user_token_ref['user'].get('id')
creds['tenant_id'] = user_token_ref['tenant'].get('id')
print creds
# Accept either is_admin or the admin role
assert self.policy_api.can_haz(context,
('is_admin:1', 'roles:admin'),

View File

@ -146,7 +146,6 @@ class TestCase(unittest.TestCase):
for tenant_id in tenants:
self.identity_api.add_user_to_tenant(tenant_id, user['id'])
setattr(self, 'user_%s' % user['id'], user_copy)
print user_copy
for role in fixtures.ROLES:
rv = self.identity_api.create_role(role['id'], role)

61
tests/test_cli.py Normal file
View File

@ -0,0 +1,61 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import nose.exc
from keystone import config
from keystone import test
from keystone.common import utils
import default_fixtures
import test_keystoneclient
CONF = config.CONF
KEYSTONECLIENT_REPO = 'git://github.com/openstack/python-keystoneclient.git'
class CliMasterTestCase(test_keystoneclient.KcMasterTestCase):
def setUp(self):
super(CliMasterTestCase, self).setUp()
from keystone import cli
self.cli = cli
def get_client(self, user_ref=None, tenant_ref=None):
if user_ref is None:
user_ref = self.user_foo
if tenant_ref is None:
for user in default_fixtures.USERS:
if user['id'] == user_ref['id']:
tenant_id = user['tenants'][0]
else:
tenant_id = tenant_ref['id']
cl = self._client(username=user_ref['name'],
password=user_ref['password'],
tenant_id=tenant_id)
gen = self.cli.CommandLineGenerator(
cmd=test.rootdir('bin', 'keystone-manage'),
execute=True,
auth_token=cl.auth_token,
endpoint=cl.management_url)
gen.auth_token = cl.auth_token
gen.management_url = cl.management_url
return gen
def test_authenticate_tenant_id_and_tenants(self):
raise nose.exc.SkipTest('N/A')
def test_authenticate_token_no_tenant(self):
raise nose.exc.SkipTest('N/A')
def test_authenticate_token_tenant_id(self):
raise nose.exc.SkipTest('N/A')
def test_authenticate_token_tenant_name(self):
raise nose.exc.SkipTest('N/A')
def test_tenant_create_update_and_delete(self):
raise nose.exc.SkipTest('cli does not support booleans yet')
def test_invalid_password(self):
raise nose.exc.SkipTest('N/A')
def test_user_create_update_delete(self):
raise nose.exc.SkipTest('cli does not support booleans yet')

View File

@ -113,7 +113,7 @@ class KcMasterTestCase(CompatTestCase):
def test_endpoints(self):
client = self.get_client()
token = client.auth_token
endpoints = client.tokens.endpoints(token)
endpoints = client.tokens.endpoints(token=token)
# FIXME(ja): this test should require the "keystone:admin" roled
# (probably the role set via --keystone_admin_role flag)
@ -125,16 +125,16 @@ class KcMasterTestCase(CompatTestCase):
test_tenant = 'new_tenant'
client = self.get_client()
tenant = client.tenants.create(test_tenant,
tenant = client.tenants.create(tenant_name=test_tenant,
description="My new tenant!",
enabled=True)
self.assertEquals(tenant.name, test_tenant)
tenant = client.tenants.get(tenant.id)
tenant = client.tenants.get(tenant_id=tenant.id)
self.assertEquals(tenant.name, test_tenant)
# TODO(devcamcar): update gives 404. why?
tenant = client.tenants.update(tenant.id,
tenant = client.tenants.update(tenant_id=tenant.id,
tenant_name='new_tenant2',
enabled=False,
description='new description')
@ -142,7 +142,7 @@ class KcMasterTestCase(CompatTestCase):
self.assertFalse(tenant.enabled)
self.assertEquals(tenant.description, 'new description')
client.tenants.delete(tenant.id)
client.tenants.delete(tenant=tenant.id)
self.assertRaises(client_exceptions.NotFound, client.tenants.get,
tenant.id)
@ -153,25 +153,26 @@ class KcMasterTestCase(CompatTestCase):
def test_tenant_add_and_remove_user(self):
client = self.get_client()
client.roles.add_user_to_tenant(self.tenant_baz['id'],
self.user_foo['id'],
self.role_useless['id'])
client.roles.add_user_to_tenant(tenant_id=self.tenant_baz['id'],
user_id=self.user_foo['id'],
role_id=self.role_useless['id'])
tenant_refs = client.tenants.list()
self.assert_(self.tenant_baz['id'] in
[x.id for x in tenant_refs])
# get the "role_refs" so we get the proper id, this is how the clients
# do it
roleref_refs = client.roles.get_user_role_refs(self.user_foo['id'])
roleref_refs = client.roles.get_user_role_refs(
user_id=self.user_foo['id'])
for roleref_ref in roleref_refs:
if (roleref_ref.roleId == self.role_useless['id'] and
roleref_ref.tenantId == self.tenant_baz['id']):
# use python's scope fall through to leave roleref_ref set
break
client.roles.remove_user_from_tenant(self.tenant_baz['id'],
self.user_foo['id'],
roleref_ref.id)
client.roles.remove_user_from_tenant(tenant_id=self.tenant_baz['id'],
user_id=self.user_foo['id'],
role_id=roleref_ref.id)
tenant_refs = client.tenants.list()
self.assert_(self.tenant_baz['id'] not in
@ -194,17 +195,19 @@ class KcMasterTestCase(CompatTestCase):
test_username = 'new_user'
client = self.get_client()
user = client.users.create(test_username, 'password', 'user1@test.com')
user = client.users.create(name=test_username,
password='password',
email='user1@test.com')
self.assertEquals(user.name, test_username)
user = client.users.get(user.id)
user = client.users.get(user=user.id)
self.assertEquals(user.name, test_username)
user = client.users.update_email(user, 'user2@test.com')
user = client.users.update_email(user=user, email='user2@test.com')
self.assertEquals(user.email, 'user2@test.com')
# NOTE(termie): update_enabled doesn't return anything, probably a bug
client.users.update_enabled(user, False)
client.users.update_enabled(user=user, enabled=False)
user = client.users.get(user.id)
self.assertFalse(user.enabled)
@ -214,12 +217,12 @@ class KcMasterTestCase(CompatTestCase):
password='password')
client.users.update_enabled(user, True)
user = client.users.update_password(user, 'password2')
user = client.users.update_password(user=user, password='password2')
test_client = self._client(username=test_username,
password='password2')
user = client.users.update_tenant(user, 'bar')
user = client.users.update_tenant(user=user, tenant='bar')
# TODO(ja): once keystonelight supports default tenant
# when you login without specifying tenant, the
# token should be scoped to tenant 'bar'
@ -237,12 +240,12 @@ class KcMasterTestCase(CompatTestCase):
def test_user_get(self):
client = self.get_client()
user = client.users.get(self.user_foo['id'])
user = client.users.get(user=self.user_foo['id'])
self.assertRaises(AttributeError, lambda: user.password)
def test_role_get(self):
client = self.get_client()
role = client.roles.get('keystone_admin')
role = client.roles.get(role='keystone_admin')
self.assertEquals(role.id, 'keystone_admin')
def test_role_create_and_delete(self):
@ -250,16 +253,16 @@ class KcMasterTestCase(CompatTestCase):
test_role = 'new_role'
client = self.get_client()
role = client.roles.create(test_role)
role = client.roles.create(name=test_role)
self.assertEquals(role.name, test_role)
role = client.roles.get(role)
role = client.roles.get(role=role.id)
self.assertEquals(role.name, test_role)
client.roles.delete(role)
client.roles.delete(role=role.id)
self.assertRaises(client_exceptions.NotFound, client.roles.get,
test_role)
role=test_role)
def test_role_list(self):
client = self.get_client()
@ -269,25 +272,26 @@ class KcMasterTestCase(CompatTestCase):
def test_roles_get_by_user(self):
client = self.get_client()
roles = client.roles.get_user_role_refs('foo')
roles = client.roles.get_user_role_refs(user_id='foo')
self.assertTrue(len(roles) > 0)
def test_ec2_credential_crud(self):
client = self.get_client()
creds = client.ec2.list(self.user_foo['id'])
creds = client.ec2.list(user_id=self.user_foo['id'])
self.assertEquals(creds, [])
cred = client.ec2.create(self.user_foo['id'], self.tenant_bar['id'])
creds = client.ec2.list(self.user_foo['id'])
cred = client.ec2.create(user_id=self.user_foo['id'],
tenant_id=self.tenant_bar['id'])
creds = client.ec2.list(user_id=self.user_foo['id'])
self.assertEquals(creds, [cred])
got = client.ec2.get(self.user_foo['id'], cred.access)
got = client.ec2.get(user_id=self.user_foo['id'], access=cred.access)
self.assertEquals(cred, got)
# FIXME(ja): need to test ec2 validation here
client.ec2.delete(self.user_foo['id'], cred.access)
creds = client.ec2.list(self.user_foo['id'])
client.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
creds = client.ec2.list(user_id=self.user_foo['id'])
self.assertEquals(creds, [])
def test_ec2_credentials_list_unauthorized_user(self):
@ -329,20 +333,24 @@ class KcMasterTestCase(CompatTestCase):
test_service = 'new_service'
client = self.get_client()
service = client.services.create(test_service, 'test', 'test')
service = client.services.create(name=test_service,
service_type='test',
description='test')
self.assertEquals(service.name, test_service)
service = client.services.get(service.id)
service = client.services.get(id=service.id)
self.assertEquals(service.name, test_service)
client.services.delete(service.id)
client.services.delete(id=service.id)
self.assertRaises(client_exceptions.NotFound, client.services.get,
service.id)
id=service.id)
def test_service_list(self):
client = self.get_client()
test_service = 'new_service'
service = client.services.create(test_service, 'test', 'test')
service = client.services.create(name=test_service,
service_type='test',
description='test')
services = client.services.list()
# TODO(devcamcar): This assert should be more specific.
self.assertTrue(len(services) > 0)