From d4f2bf5fdefca433ee81075dd28be6f1b7387b50 Mon Sep 17 00:00:00 2001 From: termie Date: Tue, 24 Jan 2012 23:01:51 -0800 Subject: [PATCH] add a bunch of basic tests for the cli --- keystone/cli.py | 146 ++++++++++++++++++++++++++++++++++- keystone/common/wsgi.py | 1 - keystone/test.py | 1 - tests/test_cli.py | 61 +++++++++++++++ tests/test_keystoneclient.py | 80 ++++++++++--------- 5 files changed, 249 insertions(+), 40 deletions(-) create mode 100644 tests/test_cli.py diff --git a/keystone/cli.py b/keystone/cli.py index 814ef51304..0469ab3a65 100644 --- a/keystone/cli.py +++ b/keystone/cli.py @@ -1,8 +1,10 @@ from __future__ import absolute_import +import json import logging import os import sys +import StringIO import textwrap import cli.app @@ -19,7 +21,7 @@ config.register_cli_str('endpoint', default='http://localhost:$admin_port/v2.0', #group='ks', conf=CONF) -config.register_cli_str('auth_token', +config.register_cli_str('auth-token', default='$admin_token', #group='ks', help='asdasd', @@ -113,7 +115,29 @@ class ClientCommand(BaseApp): if CONF.id_only and getattr(resp, 'id'): print resp.id return - print resp + + if resp is None: + return + + # NOTE(termie): this is ugly but it is mostly because the keystoneclient + # code doesn't give us very serializable instance objects + if type(resp) in [type(list()), type(tuple())]: + o = [] + for r in resp: + d = {} + for k, v in sorted(r.__dict__.iteritems()): + if k[0] == '_' or k == 'manager': + continue + d[k] = v + o.append(d) + else: + o = {} + for k, v in sorted(resp.__dict__.iteritems()): + if k[0] == '_' or k == 'manager': + continue + o[k] = v + + print json.dumps(o) def print_help(self): CONF.set_usage(CONF.usage.replace( @@ -175,6 +199,122 @@ CMDS = {'db_sync': DbSync, } +class CommandLineGenerator(object): + """A keystoneclient lookalike to generate keystone-manage commands. + + One would use it like so: + + >>> gen = CommandLineGenerator(id_only=None) + >>> cl = gen.ec2.create(user_id='foo', tenant_id='foo') + >>> cl.to_argv() + ... ['keystone-manage', + '--id-only', + 'ec2', + 'create', + 'user_id=foo', + 'tenant_id=foo'] + + """ + + cmd = 'keystone-manage' + + def __init__(self, cmd=None, execute=False, **kw): + if cmd: + self.cmd = cmd + self.flags = kw + self.execute = execute + + def __getattr__(self, key): + return _Manager(self, key) + + +class _Manager(object): + def __init__(self, parent, name): + self.parent = parent + self.name = name + + def __getattr__(self, key): + return _CommandLine(cmd=self.parent.cmd, + flags=self.parent.flags, + manager=self.name, + method=key, + execute=self.parent.execute) + + +class _CommandLine(object): + def __init__(self, cmd, flags, manager, method, execute=False): + self.cmd = cmd + self.flags = flags + self.manager = manager + self.method = method + self.execute = execute + self.kw = {} + + def __call__(self, **kw): + self.kw = kw + if self.execute: + logging.debug('generated cli: %s', str(self)) + out = StringIO.StringIO() + old_out = sys.stdout + sys.stdout = out + try: + main(self.to_argv()) + except SystemExit as e: + pass + finally: + sys.stdout = old_out + rv = out.getvalue().strip().split('\n')[-1] + try: + loaded = json.loads(rv) + if type(loaded) in [type(list()), type(tuple())]: + return [DictWrapper(**x) for x in loaded] + elif type(loaded) is type(dict()): + return DictWrapper(**loaded) + except Exception: + logging.exception('Could not parse JSON: %s', rv) + return rv + return self + + def __flags(self): + o = [] + for k, v in self.flags.iteritems(): + k = k.replace('_', '-') + if v is None: + o.append('--%s' % k) + else: + o.append('--%s=%s' % (k, str(v))) + return o + + def __manager(self): + if self.manager.endswith('s'): + return self.manager[:-1] + return self.manager + + def __kw(self): + o = [] + for k, v in self.kw.iteritems(): + o.append('%s=%s' % (k, str(v))) + return o + + def to_argv(self): + return ([self.cmd] + + self.__flags() + + [self.__manager(), self.method] + + self.__kw()) + + def __str__(self): + args = self.to_argv() + return ' '.join(args[:1] + ['"%s"' % x for x in args[1:]]) + + +class DictWrapper(dict): + def __getattr__(self, key): + try: + return self[key] + except KeyError: + raise AttributeError(key) + + def print_commands(cmds): print print "Available commands:" @@ -195,7 +335,9 @@ def run(cmd, args): def main(argv=None, config_files=None): + CONF.reset() args = CONF(config_files=config_files, args=argv) + if len(args) < 2: CONF.print_help() print_commands(CMDS) diff --git a/keystone/common/wsgi.py b/keystone/common/wsgi.py index fafa969435..34caec2c4d 100644 --- a/keystone/common/wsgi.py +++ b/keystone/common/wsgi.py @@ -196,7 +196,6 @@ class Application(BaseApplication): creds = user_token_ref['metadata'].copy() creds['user_id'] = user_token_ref['user'].get('id') creds['tenant_id'] = user_token_ref['tenant'].get('id') - print creds # Accept either is_admin or the admin role assert self.policy_api.can_haz(context, ('is_admin:1', 'roles:admin'), diff --git a/keystone/test.py b/keystone/test.py index 9e1137a7a5..4265514244 100644 --- a/keystone/test.py +++ b/keystone/test.py @@ -146,7 +146,6 @@ class TestCase(unittest.TestCase): for tenant_id in tenants: self.identity_api.add_user_to_tenant(tenant_id, user['id']) setattr(self, 'user_%s' % user['id'], user_copy) - print user_copy for role in fixtures.ROLES: rv = self.identity_api.create_role(role['id'], role) diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000000..82fde86df8 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,61 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +import nose.exc + +from keystone import config +from keystone import test +from keystone.common import utils + +import default_fixtures +import test_keystoneclient + +CONF = config.CONF +KEYSTONECLIENT_REPO = 'git://github.com/openstack/python-keystoneclient.git' + + +class CliMasterTestCase(test_keystoneclient.KcMasterTestCase): + def setUp(self): + super(CliMasterTestCase, self).setUp() + from keystone import cli + self.cli = cli + + def get_client(self, user_ref=None, tenant_ref=None): + if user_ref is None: + user_ref = self.user_foo + if tenant_ref is None: + for user in default_fixtures.USERS: + if user['id'] == user_ref['id']: + tenant_id = user['tenants'][0] + else: + tenant_id = tenant_ref['id'] + + cl = self._client(username=user_ref['name'], + password=user_ref['password'], + tenant_id=tenant_id) + gen = self.cli.CommandLineGenerator( + cmd=test.rootdir('bin', 'keystone-manage'), + execute=True, + auth_token=cl.auth_token, + endpoint=cl.management_url) + gen.auth_token = cl.auth_token + gen.management_url = cl.management_url + return gen + + def test_authenticate_tenant_id_and_tenants(self): + raise nose.exc.SkipTest('N/A') + + def test_authenticate_token_no_tenant(self): + raise nose.exc.SkipTest('N/A') + + def test_authenticate_token_tenant_id(self): + raise nose.exc.SkipTest('N/A') + + def test_authenticate_token_tenant_name(self): + raise nose.exc.SkipTest('N/A') + + def test_tenant_create_update_and_delete(self): + raise nose.exc.SkipTest('cli does not support booleans yet') + def test_invalid_password(self): + raise nose.exc.SkipTest('N/A') + + def test_user_create_update_delete(self): + raise nose.exc.SkipTest('cli does not support booleans yet') diff --git a/tests/test_keystoneclient.py b/tests/test_keystoneclient.py index d38313d303..6265c17510 100644 --- a/tests/test_keystoneclient.py +++ b/tests/test_keystoneclient.py @@ -113,7 +113,7 @@ class KcMasterTestCase(CompatTestCase): def test_endpoints(self): client = self.get_client() token = client.auth_token - endpoints = client.tokens.endpoints(token) + endpoints = client.tokens.endpoints(token=token) # FIXME(ja): this test should require the "keystone:admin" roled # (probably the role set via --keystone_admin_role flag) @@ -125,16 +125,16 @@ class KcMasterTestCase(CompatTestCase): test_tenant = 'new_tenant' client = self.get_client() - tenant = client.tenants.create(test_tenant, + tenant = client.tenants.create(tenant_name=test_tenant, description="My new tenant!", enabled=True) self.assertEquals(tenant.name, test_tenant) - tenant = client.tenants.get(tenant.id) + tenant = client.tenants.get(tenant_id=tenant.id) self.assertEquals(tenant.name, test_tenant) # TODO(devcamcar): update gives 404. why? - tenant = client.tenants.update(tenant.id, + tenant = client.tenants.update(tenant_id=tenant.id, tenant_name='new_tenant2', enabled=False, description='new description') @@ -142,7 +142,7 @@ class KcMasterTestCase(CompatTestCase): self.assertFalse(tenant.enabled) self.assertEquals(tenant.description, 'new description') - client.tenants.delete(tenant.id) + client.tenants.delete(tenant=tenant.id) self.assertRaises(client_exceptions.NotFound, client.tenants.get, tenant.id) @@ -153,25 +153,26 @@ class KcMasterTestCase(CompatTestCase): def test_tenant_add_and_remove_user(self): client = self.get_client() - client.roles.add_user_to_tenant(self.tenant_baz['id'], - self.user_foo['id'], - self.role_useless['id']) + client.roles.add_user_to_tenant(tenant_id=self.tenant_baz['id'], + user_id=self.user_foo['id'], + role_id=self.role_useless['id']) tenant_refs = client.tenants.list() self.assert_(self.tenant_baz['id'] in [x.id for x in tenant_refs]) # get the "role_refs" so we get the proper id, this is how the clients # do it - roleref_refs = client.roles.get_user_role_refs(self.user_foo['id']) + roleref_refs = client.roles.get_user_role_refs( + user_id=self.user_foo['id']) for roleref_ref in roleref_refs: if (roleref_ref.roleId == self.role_useless['id'] and roleref_ref.tenantId == self.tenant_baz['id']): # use python's scope fall through to leave roleref_ref set break - client.roles.remove_user_from_tenant(self.tenant_baz['id'], - self.user_foo['id'], - roleref_ref.id) + client.roles.remove_user_from_tenant(tenant_id=self.tenant_baz['id'], + user_id=self.user_foo['id'], + role_id=roleref_ref.id) tenant_refs = client.tenants.list() self.assert_(self.tenant_baz['id'] not in @@ -194,17 +195,19 @@ class KcMasterTestCase(CompatTestCase): test_username = 'new_user' client = self.get_client() - user = client.users.create(test_username, 'password', 'user1@test.com') + user = client.users.create(name=test_username, + password='password', + email='user1@test.com') self.assertEquals(user.name, test_username) - user = client.users.get(user.id) + user = client.users.get(user=user.id) self.assertEquals(user.name, test_username) - user = client.users.update_email(user, 'user2@test.com') + user = client.users.update_email(user=user, email='user2@test.com') self.assertEquals(user.email, 'user2@test.com') # NOTE(termie): update_enabled doesn't return anything, probably a bug - client.users.update_enabled(user, False) + client.users.update_enabled(user=user, enabled=False) user = client.users.get(user.id) self.assertFalse(user.enabled) @@ -214,12 +217,12 @@ class KcMasterTestCase(CompatTestCase): password='password') client.users.update_enabled(user, True) - user = client.users.update_password(user, 'password2') + user = client.users.update_password(user=user, password='password2') test_client = self._client(username=test_username, password='password2') - user = client.users.update_tenant(user, 'bar') + user = client.users.update_tenant(user=user, tenant='bar') # TODO(ja): once keystonelight supports default tenant # when you login without specifying tenant, the # token should be scoped to tenant 'bar' @@ -237,12 +240,12 @@ class KcMasterTestCase(CompatTestCase): def test_user_get(self): client = self.get_client() - user = client.users.get(self.user_foo['id']) + user = client.users.get(user=self.user_foo['id']) self.assertRaises(AttributeError, lambda: user.password) def test_role_get(self): client = self.get_client() - role = client.roles.get('keystone_admin') + role = client.roles.get(role='keystone_admin') self.assertEquals(role.id, 'keystone_admin') def test_role_create_and_delete(self): @@ -250,16 +253,16 @@ class KcMasterTestCase(CompatTestCase): test_role = 'new_role' client = self.get_client() - role = client.roles.create(test_role) + role = client.roles.create(name=test_role) self.assertEquals(role.name, test_role) - role = client.roles.get(role) + role = client.roles.get(role=role.id) self.assertEquals(role.name, test_role) - client.roles.delete(role) + client.roles.delete(role=role.id) self.assertRaises(client_exceptions.NotFound, client.roles.get, - test_role) + role=test_role) def test_role_list(self): client = self.get_client() @@ -269,25 +272,26 @@ class KcMasterTestCase(CompatTestCase): def test_roles_get_by_user(self): client = self.get_client() - roles = client.roles.get_user_role_refs('foo') + roles = client.roles.get_user_role_refs(user_id='foo') self.assertTrue(len(roles) > 0) def test_ec2_credential_crud(self): client = self.get_client() - creds = client.ec2.list(self.user_foo['id']) + creds = client.ec2.list(user_id=self.user_foo['id']) self.assertEquals(creds, []) - cred = client.ec2.create(self.user_foo['id'], self.tenant_bar['id']) - creds = client.ec2.list(self.user_foo['id']) + cred = client.ec2.create(user_id=self.user_foo['id'], + tenant_id=self.tenant_bar['id']) + creds = client.ec2.list(user_id=self.user_foo['id']) self.assertEquals(creds, [cred]) - got = client.ec2.get(self.user_foo['id'], cred.access) + got = client.ec2.get(user_id=self.user_foo['id'], access=cred.access) self.assertEquals(cred, got) # FIXME(ja): need to test ec2 validation here - client.ec2.delete(self.user_foo['id'], cred.access) - creds = client.ec2.list(self.user_foo['id']) + client.ec2.delete(user_id=self.user_foo['id'], access=cred.access) + creds = client.ec2.list(user_id=self.user_foo['id']) self.assertEquals(creds, []) def test_ec2_credentials_list_unauthorized_user(self): @@ -329,20 +333,24 @@ class KcMasterTestCase(CompatTestCase): test_service = 'new_service' client = self.get_client() - service = client.services.create(test_service, 'test', 'test') + service = client.services.create(name=test_service, + service_type='test', + description='test') self.assertEquals(service.name, test_service) - service = client.services.get(service.id) + service = client.services.get(id=service.id) self.assertEquals(service.name, test_service) - client.services.delete(service.id) + client.services.delete(id=service.id) self.assertRaises(client_exceptions.NotFound, client.services.get, - service.id) + id=service.id) def test_service_list(self): client = self.get_client() test_service = 'new_service' - service = client.services.create(test_service, 'test', 'test') + service = client.services.create(name=test_service, + service_type='test', + description='test') services = client.services.list() # TODO(devcamcar): This assert should be more specific. self.assertTrue(len(services) > 0)