Refactored manage.py to be both testable and useful for testing

- Added keystone.test.functional.test_core_api, with example
- Also revised imports in functional tests

Change-Id: I2af73d6978c44944554d63283c41e43cc8aca214
This commit is contained in:
Dolph Mathews 2011-08-18 14:59:27 -05:00
parent d43876ce8e
commit 902497c84c
11 changed files with 563 additions and 482 deletions

View File

@ -1,459 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Keystone Identity Server - CLI Management Interface
"""
import datetime
import logging
import optparse
import sys
import tools.tracer # @UnusedImport # module runs on import
import keystone
from keystone.common import config
import keystone.backends as db
import keystone.backends.api as db_api
import keystone.backends.models as db_models
class RaisingOptionParser(optparse.OptionParser):
def error(self, msg):
self.print_usage(sys.stderr)
raise optparse.OptParseError(msg)
def parse_args(args=None):
usage = "usage: %prog [options] type command [id [attributes]]"
# Initialize a parser for our configuration paramaters
parser = RaisingOptionParser(usage, version='%%prog %s'
% keystone.version())
_common_group = config.add_common_options(parser)
config.add_log_options(parser)
# Parse command-line and load config
(options, args) = config.parse_options(parser, args)
_config_file, conf = config.load_paste_config('admin', options, args)
# Set things up to run the command
debug = options.get('debug') or conf.get('debug', False)
debug = debug in [True, "True", "1"]
verbose = options.get('verbose') or conf.get('verbose', False)
verbose = verbose in [True, "True", "1"]
if debug or verbose:
_config_file = config.find_config_file(options, args)
config.setup_logging(options, conf)
db.configure_backends(conf.global_conf)
return args
def process(*args):
"""
Usage: keystone-manage [options] type command [id [attributes]]
type : role, tenant, user, token, endpoint, endpointTemplates
command : add, list, disable, delete, grant, revoke
id : name or id
attributes : depending on type...
users : password, tenant
tokens : user, tenant, expiration
role list [tenant] will list roles granted on that tenant
options
-c | --config-file : config file to use
-d | --debug : debug mode
Example: keystone-manage user add Admin P@ssw0rd
"""
# Check arguments
if len(args) == 0:
raise optparse.OptParseError(
'No object type specified for first argument')
object_type = args[0]
if object_type not in ['user', 'tenant', 'role', 'service',
'endpointTemplates', 'token', 'endpoint', 'credentials']:
raise optparse.OptParseError(
'%s is not a supported object type' % object_type)
if len(args) == 1:
raise optparse.OptParseError(
'No command specified for second argument')
command = args[1]
if command not in ['add', 'list', 'disable', 'delete', 'grant', 'revoke']:
raise optparse.OptParseError('add, disable, delete, and list are the '
'only supported commands (right now)')
if len(args) == 2:
if command != 'list':
raise optparse.OptParseError('No id specified for third argument')
if len(args) > 2:
object_id = args[2]
if object_type == "user":
if command == "add":
if len(args) < 4:
raise optparse.OptParseError(
'No password specified for fourth argument')
password = args[3]
try:
object = db_models.User()
object.id = object_id
object.password = password
object.enabled = True
if len(args) > 4:
tenant = args[4]
object.tenant_id = tenant
db_api.USER.create(object)
print "SUCCESS: User %s created." % object.id
except:
raise Exception("Failed to create user %s" % (object_id,),
sys.exc_info())
return
elif command == "disable":
try:
object = db_api.USER.get(object_id)
if object == None:
raise IndexError("User %s not found" % object_id)
object.enabled = False
db_api.USER.update(object_id, object)
print "SUCCESS: User %s disabled." % object.id
except:
raise Exception("Failed to disable user %s" % (object_id,),
sys.exc_info())
return
elif command == "list":
try:
if len(args) > 2:
tenant = args[2]
objects = db_api.USER.get_by_tenant(tenant)
if objects == None:
raise IndexError("Users not found")
print 'id', 'enabled'
print '-' * 20
for row in objects:
print row.id, row.enabled
else:
objects = db_api.USER.get_all()
if objects == None:
raise IndexError("Users not found")
print 'id', 'enabled', 'tenant'
print '-' * 20
for row in objects:
print row.id, row.enabled, row.tenant_id
except:
raise Exception("Error getting all users", sys.exc_info())
return
elif object_type == "tenant":
if command == "add":
try:
object = db_models.Tenant()
object.id = object_id
object.enabled = True
db_api.TENANT.create(object)
print "SUCCESS: Tenant %s created." % object.id
return
except:
raise Exception("Failed to create tenant %s" % (object_id,),
sys.exc_info())
elif command == "list":
try:
objects = db_api.TENANT.get_all()
if objects == None:
raise IndexError("Tenants not found")
print 'tenant', 'enabled'
print '-' * 20
for row in objects:
print row.id, row.enabled
except:
raise Exception("Error getting all tenants", sys.exc_info())
return
elif command == "disable":
try:
object = db_api.TENANT.get(object_id)
if object == None:
raise IndexError("Tenant %s not found" % object_id)
object.enabled = False
db_api.TENANT.update(object_id, object)
print "SUCCESS: Tenant %s disabled." % object.id
except:
raise Exception("Failed to disable tenant %s" % (object_id,),
sys.exc_info())
return
elif object_type == "role":
if command == "add":
try:
object = db_models.Role()
object.id = object_id
db_api.ROLE.create(object)
print "SUCCESS: Role %s created successfully." % object.id
return
except:
raise Exception("Failed to create role %s" % (object_id,),
sys.exc_info())
elif command == "list":
if len(args) == 3:
tenant = args[2]
try:
objects = db_api.TENANT.get_role_assignments(tenant)
if objects == None:
raise IndexError("Assignments not found")
print 'Role assignments for tenant %s' % tenant
print 'User', 'Role'
print '-' * 20
for row in objects:
print row.user_id, row.role_id
except:
raise Exception("Error getting all role assignments for %s"
% (tenant,), sys.exc_info())
return
else:
tenant = None
try:
objects = db_api.ROLE.get_all()
if objects == None:
raise IndexError("Roles not found")
print 'All roles'
print 'Role'
print '-' * 20
for row in objects:
print row.id
except:
raise Exception("Error getting all roles", sys.exc_info())
return
elif command == "grant":
if len(args) < 4:
raise optparse.OptParseError("Missing arguments: role grant "
"'role' 'user' 'tenant (optional)'")
user = args[3]
if len(args) > 4:
tenant = args[4]
else:
tenant = None
try:
object = db_models.UserRoleAssociation()
object.role_id = object_id
object.user_id = user
if tenant != None:
object.tenant_id = tenant
db_api.USER.user_role_add(object)
print("SUCCESS: Granted %s the %s role on %s." %
(object.user_id, object.role_id, object.tenant_id))
except:
raise Exception("Failed to grant role %s to %s on %s" %
(object_id, user, tenant), sys.exc_info())
return
elif object_type == "endpointTemplates":
if command == "add":
if len(args) < 9:
raise optparse.OptParseError("Missing arguments: "
"endpointTemplates add 'region' 'service' 'publicURL' "
"'adminURL' 'internalURL' 'enabled' 'global'")
region = args[2]
service = args[3]
public_url = args[4]
admin_url = args[5]
internal_url = args[6]
enabled = args[7]
is_global = args[8]
try:
object = db_models.EndpointTemplates()
object.region = region
object.service = service
object.public_url = public_url
object.admin_url = admin_url
object.internal_url = internal_url
object.enabled = enabled
object.is_global = is_global
object = db_api.ENDPOINT_TEMPLATE.create(object)
print("SUCCESS: Created EndpointTemplates for %s pointing "
"to %s." % (object.service, object.public_url))
return
except:
raise Exception("Failed to create EndpointTemplates for %s" %
(service,), sys.exc_info())
elif command == "list":
if len(args) == 3:
tenant = args[2]
try:
objects = db_api.ENDPOINT_TEMPLATE.endpoint_get_by_tenant(
tenant)
if objects == None:
raise IndexError("URLs not found")
print 'Endpoints for tenant %s' % tenant
print 'service', 'region', 'Public URL'
print '-' * 30
for row in objects:
print row.service, row.region, row.public_url
except:
raise Exception("Error getting all endpoints for %s" %
(tenant,), sys.exc_info())
return
else:
tenant = None
try:
objects = db_api.ENDPOINT_TEMPLATE.get_all()
if objects == None:
raise IndexError("URLs not found")
print 'All EndpointTemplates'
print 'service', 'region', 'Public URL'
print '-' * 20
for row in objects:
print row.service, row.region, row.public_url
except:
raise Exception("Error getting all EndpointTemplates",
sys.exc_info())
return
elif object_type == "endpoint":
if command == "add":
if len(args) < 4:
raise optparse.OptParseError("Missing arguments: endPoint add "
"tenant endPointTemplate'")
tenant_id = args[2]
endpoint_template_id = args[3]
try:
object = db_models.Endpoints()
object.tenant_id = tenant_id
object.endpoint_template_id = endpoint_template_id
object = db_api.ENDPOINT_TEMPLATE.endpoint_add(object)
print("SUCCESS: Endpoint %s added to tenant %s." %
(endpoint_template_id, tenant_id))
return
except:
raise Exception("Failed to create Endpoint", sys.exc_info())
elif object_type == "token":
if command == "add":
if len(args) < 6:
raise optparse.OptParseError('Creating a token requires a '
'token id, user, tenant, and expiration')
try:
object = db_models.Token()
object.id = object_id
object.user_id = args[3]
object.tenant_id = args[4]
tuple_time = datetime.datetime.strptime(args[5]
.replace("-", ""),
"%Y%m%dT%H:%M")
object.expires = tuple_time
db_api.TOKEN.create(object)
print "SUCCESS: Token %s created." % object.id
return
except:
raise Exception("Failed to create token %s" % (object_id,),
sys.exc_info())
elif command == "list":
try:
objects = db_api.TOKEN.get_all()
if objects == None:
raise IndexError("Tokens not found")
print 'token', 'user', 'expiration', 'tenant'
print '-' * 20
for row in objects:
print row.id, row.user_id, row.expires, row.tenant_id
except:
raise Exception("Error getting all tokens", sys.exc_info())
return
elif command == "delete":
try:
object = db_api.TOKEN.get(object_id)
if object == None:
raise IndexError("Token %s not found" % object_id)
else:
db_api.TOKEN.delete(object_id)
print 'SUCCESS: Token %s deleted.' % object_id
except:
raise Exception("Failed to delete token %s" % (object_id,),
sys.exc_info())
return
elif object_type == "service":
if command == "add":
try:
object = db_models.Service()
object.id = object_id
db_api.SERVICE.create(object)
print "SUCCESS: Service %s created successfully." % \
(object.id,)
return
except:
raise Exception("Failed to create Service %s" % \
(object_id,), sys.exc_info())
elif command == "list":
try:
objects = db_api.SERVICE.get_all()
if objects == None:
raise IndexError("Services not found")
print objects
print 'All Services'
print 'Service'
print '-' * 20
for row in objects:
print row.id
except:
raise Exception("Error getting all services", sys.exc_info())
elif object_type == "credentials":
if command == "add":
if len(args) < 6:
raise optparse.OptParseError('Creating a credentials requires '
'a type, key, secret, and tenant_id (id is user_id)')
try:
object = db_models.Token()
object.user_id = object_id
object.type = args[3]
object.key = args[4]
object.secret = args[5]
if len(args) == 7:
object.tenant_id = args[6]
result = db_api.CREDENTIALS.create(object)
print "SUCCESS: Credentials %s created." % result.id
return
except:
raise Exception("Failed to create credentials %s" %
(object_id,), sys.exc_info())
# Command not handled
print ("ERROR: %s %s not yet supported" % (object_type, command))
def main():
try:
process(*parse_args())
except optparse.OptParseError as exc:
print >> sys.stderr, exc
sys.exit(2)
except Exception as exc:
try:
info = exc.args[1]
except IndexError:
print "ERROR: %s" % (exc,)
logging.error(str(exc))
else:
print "ERROR: %s: %s" % (exc.args[0], info)
logging.error(exc.args[0], exc_info=info)
sys.exit(1)
if __name__ == '__main__':
main()

246
keystone/manage/__init__.py Normal file
View File

@ -0,0 +1,246 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Keystone Identity Server - CLI Management Interface
"""
import sys
import logging
import optparse
import keystone
from keystone.common import config
from keystone.manage.api import *
class RaisingOptionParser(optparse.OptionParser):
def error(self, msg):
self.print_usage(sys.stderr)
raise optparse.OptParseError(msg)
def parse_args(args=None):
usage = "usage: %prog [options] type command [id [attributes]]"
# Initialize a parser for our configuration paramaters
parser = RaisingOptionParser(usage, version='%%prog %s'
% keystone.version())
_common_group = config.add_common_options(parser)
config.add_log_options(parser)
# Parse command-line and load config
(options, args) = config.parse_options(parser, args)
_config_file, conf = config.load_paste_config('admin', options, args)
config.setup_logging(options, conf)
db.configure_backends(conf.global_conf)
return args
def process(*args):
"""
Usage: keystone-manage [options] type command [id [attributes]]
type : role, tenant, user, token, endpoint, endpointTemplates
command : add, list, disable, delete, grant, revoke
id : name or id
attributes : depending on type...
users : password, tenant
tokens : user, tenant, expiration
role list [tenant] will list roles granted on that tenant
options
-c | --config-file : config file to use
-d | --debug : debug mode
Example: keystone-manage user add Admin P@ssw0rd
"""
# Check arguments
if len(args) == 0:
raise optparse.OptParseError(
'No obj type specified for first argument')
object_type = args[0]
if object_type not in ['user', 'tenant', 'role', 'service',
'endpointTemplates', 'token', 'endpoint', 'credentials']:
raise optparse.OptParseError(
'%s is not a supported obj type' % object_type)
if len(args) == 1:
raise optparse.OptParseError(
'No command specified for second argument')
command = args[1]
if command not in ['add', 'list', 'disable', 'delete', 'grant', 'revoke']:
raise optparse.OptParseError('add, disable, delete, and list are the '
'only supported commands (right now)')
if len(args) == 2:
if command != 'list':
raise optparse.OptParseError('No id specified for third argument')
if len(args) > 2:
object_id = args[2]
# Helper functions
def require_args(args, min, msg):
"""Ensure there are at least `min` arguments"""
if len(args) < min:
raise optparse.OptParseError(msg)
optional_arg = (lambda x: len(args) > x and args[x] or None)
def print_table(header_row, rows):
"""Prints a lists of lists as table in a human readable format"""
print "\t".join(header_row)
print '-' * 79
rows = [[str(col) for col in row] for row in rows]
print "\n".join(["\t".join(row) for row in rows])
# Execute command
if (object_type, command) == ('user', 'add'):
require_args(args, 4, 'No password specified for fourth argument')
if add_user(id=object_id, password=args[3], tenant=optional_arg(4)):
print "SUCCESS: User %s created." % object_id
elif (object_type, command) == ('user', 'disable'):
if disable_user(id=object_id):
print "SUCCESS: User %s disabled." % object_id
elif (object_type, command) == ('user', 'list'):
print_table(('id', 'enabled', 'tenant'), list_users())
elif (object_type, command) == ('tenant', 'add'):
if add_tenant(id=object_id):
print "SUCCESS: Tenant %s created." % object_id
elif (object_type, command) == ('tenant', 'list'):
print_table(('tenant', 'enabled'), list_tenants())
elif (object_type, command) == ('tenant', 'disable'):
if disable_tenant(id=object_id):
print "SUCCESS: Tenant %s disabled." % object_id
elif (object_type, command) == ('role', 'add'):
if add_role(id=object_id):
print "SUCCESS: Role %s created successfully." % object_id
elif (object_type, command) == ('role', 'list'):
tenant = optional_arg(2)
if tenant:
# print with users
print 'Role assignments for tenant %s' % tenant
print_table(('User', 'Role'), list_roles(tenant=tenant))
else:
# print without tenants
print_table(('id'), list_roles())
elif (object_type, command) == ('role', 'grant'):
require_args(args, 4, "Missing arguments: role grant 'role' 'user' "
"'tenant (optional)'")
tenant = len(args) > 4 and args[4] or None
if grant_role(object_id, args[3], tenant):
print("SUCCESS: Granted %s the %s role on %s." %
(object_id, args[3], tenant))
elif (object_type, command) == ('endpointTemplates', 'add'):
require_args(args, 9, "Missing arguments: endpointTemplates add "
"'region' 'service' 'publicURL' 'adminURL' 'internalURL' "
"'enabled' 'global'")
if add_endpoint_template(region=args[2], service=args[3],
public_url=args[4], admin_url=args[5], internal_url=args[6],
enabled=args[7], is_global=args[8]):
print("SUCCESS: Created EndpointTemplates for %s pointing to %s." %
(args[3], args[4]))
elif (object_type, command) == ('endpointTemplates', 'list'):
tenant = optional_arg(2)
if tenant:
print 'Endpoints for tenant %s' % tenant
print_table(('service', 'region', 'Public URL'),
list_tenant_endpoints())
else:
print 'All EndpointTemplates'
print_table(('service', 'region', 'Public URL'),
list_endpoint_templates())
elif (object_type, command) == ('endpoint', 'add'):
require_args(args, 4, "Missing arguments: endPoint add tenant "
"endPointTemplate")
if add_endpoint(tenant=args[2], endpoint_template=args[3]):
print("SUCCESS: Endpoint %s added to tenant %s." %
(args[3], args[2]))
elif (object_type, command) == ('token', 'add'):
require_args(args, 6, 'Creating a token requires a token id, user, '
'tenant, and expiration')
if add_token(token=object_id, user=args[3], tenant=args[4],
expires=args[5]):
print "SUCCESS: Token %s created." % (object_id,)
elif (object_type, command) == ('token', 'list'):
print_table(('token', 'user', 'expiration', 'tenant'), list_tokens())
elif (object_type, command) == ('token', 'delete'):
if delete_token(token=object_id):
print 'SUCCESS: Token %s deleted.' % (object_id,)
elif (object_type, command) == ('service', 'add'):
if add_service(service=object_id):
print "SUCCESS: Service %s created successfully." % (object_id,)
elif (object_type, command) == ('service', 'list'):
print_table(('service'), list_services())
elif (object_type, command) == ('credentials', 'add'):
require_args(args, 6, 'Creating a credentials requires a type, key, '
'secret, and tenant_id (id is user_id)')
if add_credentials(user=object_id, type=args[3], key=args[4],
secrete=args[5], tenant=optional_arg(6)):
print "SUCCESS: Credentials %s created." % result.id
else:
# Command not handled
print ("ERROR: unrecognized command %s %s" % (object_type, command))
def main():
try:
process(*parse_args())
except optparse.OptParseError as exc:
print >> sys.stderr, exc
sys.exit(2)
except Exception as exc:
try:
info = exc.args[1]
except IndexError:
print "ERROR: %s" % (exc,)
logging.error(str(exc))
else:
print "ERROR: %s: %s" % (exc.args[0], info)
logging.error(exc.args[0], exc_info=info)
sys.exit(1)
if __name__ == '__main__':
main()

245
keystone/manage/api.py Normal file
View File

@ -0,0 +1,245 @@
import sys
import datetime
import keystone.backends as db
import keystone.backends.api as db_api
import keystone.backends.models as db_models
def add_user(id, password, tenant=None):
try:
obj = db_models.User()
obj.id = id
obj.password = password
obj.enabled = True
obj.tenant_id = tenant
db_api.USER.create(obj)
return True
except:
raise Exception("Failed to create user %s" % (id,), sys.exc_info())
def disable_user(id):
try:
obj = db_api.USER.get(id)
if obj == None:
raise IndexError("User %s not found" % id)
obj.enabled = False
db_api.USER.update(id, obj)
except:
raise Exception("Failed to disable user %s" % (id,),
sys.exc_info())
def list_users():
try:
objects = db_api.USER.get_all()
if objects == None:
raise IndexError("Users not found")
return [[o.id, o.enabled, o.tenant_id] for o in objects]
except:
raise Exception("Error getting all users", sys.exc_info())
def add_tenant(id):
try:
obj = db_models.Tenant()
obj.id = id
obj.enabled = True
db_api.TENANT.create(obj)
return True
except:
raise Exception("Failed to create tenant %s" % (id,), sys.exc_info())
def list_tenants():
try:
objects = db_api.TENANT.get_all()
if objects == None:
raise IndexError("Tenants not found")
return [[o.id, o.enabled] for o in objects]
except:
raise Exception("Error getting all tenants", sys.exc_info())
def disable_tenant(id):
try:
obj = db_api.TENANT.get(id)
if obj == None:
raise IndexError("Tenant %s not found" % id)
obj.enabled = False
db_api.TENANT.update(id, obj)
return True
except:
raise Exception("Failed to disable tenant %s" % (id,), sys.exc_info())
def add_role(id):
try:
obj = db_models.Role()
obj.id = id
db_api.ROLE.create(obj)
return True
except:
raise Exception("Failed to create role %s" % (id,), sys.exc_info())
def list_role_assignments(tenant):
try:
objects = db_api.TENANT.get_role_assignments(tenant)
if objects == None:
raise IndexError("Assignments not found")
return [[o.user_id, o.role_id] for o in objects]
except:
raise Exception("Error getting all role assignments for %s"
% (tenant,), sys.exc_info())
def list_roles(tenant=None):
if tenant:
return list_role_assignments(tenant)
else:
try:
objects = db_api.ROLE.get_all()
if objects == None:
raise IndexError("Roles not found")
return [[o.id] for o in objects]
except:
raise Exception("Error getting all roles", sys.exc_info())
def grant_role(role, user, tenant=None):
"""Grants `role` to `user` (and optionally, on `tenant`)"""
try:
obj = db_models.UserRoleAssociation()
obj.role_id = role
obj.user_id = user
obj.tenant_id = tenant
db_api.USER.user_role_add(obj)
return True
except:
raise Exception("Failed to grant role %s to %s on %s" %
(role, user, tenant), sys.exc_info())
def add_endpoint_template(region, service, public_url, admin_url, internal_url,
enabled, is_global):
try:
obj = db_models.EndpointTemplates()
obj.region = region
obj.service = service
obj.public_url = public_url
obj.admin_url = admin_url
obj.internal_url = internal_url
obj.enabled = enabled
obj.is_global = is_global
obj = db_api.ENDPOINT_TEMPLATE.create(obj)
return True
except:
raise Exception("Failed to create EndpointTemplates for %s" %
(service,), sys.exc_info())
def list_tenant_endpoints(tenant):
try:
objects = db_api.ENDPOINT_TEMPLATE.endpoint_get_by_tenant(tenant)
if objects == None:
raise IndexError("URLs not found")
return [[o.service, o.region, o.public_url] for o in objects]
except:
raise Exception("Error getting all endpoints for %s" %
(tenant,), sys.exc_info())
def list_endpoint_templates():
try:
objects = db_api.ENDPOINT_TEMPLATE.get_all()
if objects == None:
raise IndexError("URLs not found")
return [[o.service, o.region, o.public_url] for o in objects]
except:
raise Exception("Error getting all EndpointTemplates",
sys.exc_info())
def add_endpoint(tenant, endpoint_template):
try:
obj = db_models.Endpoints()
obj.tenant_id = tenant
obj.endpoint_template_id = endpoint_template
db_api.ENDPOINT_TEMPLATE.endpoint_add(obj)
return obj
except:
raise Exception("Failed to create Endpoint", sys.exc_info())
def add_token(token, user, tenant, expires):
try:
obj = db_models.Token()
obj.id = token
obj.user_id = user
obj.tenant_id = tenant
obj.expires = datetime.datetime.strptime(expires.replace("-", ""),
"%Y%m%dT%H:%M")
db_api.TOKEN.create(obj)
return obj
except:
raise Exception("Failed to create token %s" % (token,), sys.exc_info())
def list_tokens():
try:
objects = db_api.TOKEN.get_all()
if objects == None:
raise IndexError("Tokens not found")
return [[o.id, o.user_id, o.expires, o.tenant_id] for o in objects]
except:
raise Exception("Error getting all tokens", sys.exc_info())
def delete_token(token):
try:
obj = db_api.TOKEN.get(token)
if obj == None:
raise IndexError("Token %s not found" % (token,))
db_api.TOKEN.delete(token)
return True
except:
raise Exception("Failed to delete token %s" % (token,),
sys.exc_info())
def add_service(service):
try:
obj = db_models.Service()
obj.id = service
db_api.SERVICE.create(obj)
return obj
except:
raise Exception("Failed to create Service %s" % (service,),
sys.exc_info())
def list_services():
try:
objects = db_api.SERVICE.get_all()
if objects == None:
raise IndexError("Services not found")
return [[o.id] for o in objects]
except:
raise Exception("Error getting all services", sys.exc_info())
def add_credentials(user, type, key, secrete, tenant=None):
try:
obj = db_models.Token()
obj.user_id = user
obj.type = type
obj.key = key
obj.secret = secret
obj.tenant_id = tenant
db_api.CREDENTIALS.create(obj)
return obj
except:
raise Exception("Failed to create credentials %s" % (object_id,),
sys.exc_info())

View File

@ -73,7 +73,7 @@ class KeystoneTest(object):
[os.path.join(BASE_DIR, 'bin/keystone'), '-c', self.conf_fp.name])
# blatent hack.
time.sleep(1)
time.sleep(2)
if self.server.poll() is not None:
raise RuntimeError('Failed to start server')

View File

@ -1,8 +1,8 @@
import unittest
from common import KeystoneTestCase
from keystone.test.functional import common
class TestAdminAuthentication(KeystoneTestCase):
class TestAdminAuthentication(common.KeystoneTestCase):
"""Test admin-side user authentication"""
def setUp(self):
@ -20,11 +20,11 @@ class TestAdminAuthentication(KeystoneTestCase):
self.assertTrue(r.json['auth']['token']['expires'])
class TestAdminAuthenticationNegative(KeystoneTestCase):
class TestAdminAuthenticationNegative(common.KeystoneTestCase):
"""Negative test admin-side user authentication"""
user_id = KeystoneTestCase._uuid()
user_id2 = KeystoneTestCase._uuid()
user_id = common.KeystoneTestCase._uuid()
user_id2 = common.KeystoneTestCase._uuid()
admin_token_backup = None
def test_service_token_as_admin_token(self):
@ -69,10 +69,10 @@ class TestAdminAuthenticationNegative(KeystoneTestCase):
self.admin_request(method='DELETE', path='/users/%s' % self.user_id)
class TestServiceAuthentication(KeystoneTestCase):
class TestServiceAuthentication(common.KeystoneTestCase):
"""Test service-side user authentication"""
user_id = KeystoneTestCase._uuid()
user_id = common.KeystoneTestCase._uuid()
def setUp(self):
super(TestServiceAuthentication, self).setUp()

View File

@ -0,0 +1,49 @@
import unittest
from keystone.test.functional import common
from keystone import manage
class TestCoreServiceApi(common.KeystoneTestCase):
"""Tests core Keystone Service API"""
user = None
def setUp(self):
self.user = common.KeystoneTestCase._uuid()
# manage.parse_args(['--config-file=etc/keystone.conf'])
# manage.api.add_user(self.user, 'awe4tya46')
def tearDown(self):
pass
def testPostTokens(self):
pass
def testGetTenantsRequiresAuthentication(self):
pass
def testAuthenticateWithoutTenant(self):
pass
def testAuthenticateWithTenant(self):
pass
def testAuthenticateWithManyTenants(self):
pass
class TestCoreAdminApi(common.KeystoneTestCase):
"""Tests core Keystone Service API"""
def setUp(self):
pass
def tearDown(self):
pass
def testName(self):
pass
if __name__ == "__main__":
unittest.main()

View File

@ -1,8 +1,8 @@
import unittest2 as unittest
from common import KeystoneTestCase
from keystone.test.functional import common
class TestExtensions(KeystoneTestCase):
class TestExtensions(common.KeystoneTestCase):
def test_extensions_json(self):
r = self.service_request(path='/extensions.json',
assert_status=200)

View File

@ -1,8 +1,8 @@
import unittest
from common import KeystoneTestCase
from keystone.test.functional import common
class TestExtensions(KeystoneTestCase):
class TestExtensions(common.KeystoneTestCase):
def test_extensions_json(self):
r = self.service_request(path='/extensions.json')
self.assertTrue('json' in r.getheader('Content-Type'))
@ -12,7 +12,7 @@ class TestExtensions(KeystoneTestCase):
self.assertTrue('xml' in r.getheader('Content-Type'))
class TestAdminExtensions(KeystoneTestCase):
class TestAdminExtensions(common.KeystoneTestCase):
def test_extensions_json(self):
r = self.admin_request(path='/extensions.json')
self.assertTrue('json' in r.getheader('Content-Type'))

View File

@ -1,12 +1,12 @@
import unittest
from common import KeystoneTestCase
from keystone.test.functional import common
class TestIssue85(KeystoneTestCase):
class TestIssue85(common.KeystoneTestCase):
"""Illustrates github issue #85"""
tenant_id = KeystoneTestCase._uuid()
user_id = KeystoneTestCase._uuid()
tenant_id = common.KeystoneTestCase._uuid()
user_id = common.KeystoneTestCase._uuid()
def setUp(self):
super(TestIssue85, self).setUp()

View File

@ -1,8 +1,8 @@
import unittest
from common import KeystoneTestCase
from keystone.test.functional import common
class TestUrlHandling(KeystoneTestCase):
class TestUrlHandling(common.KeystoneTestCase):
"""Tests API's global URL handling behaviors"""
def test_optional_trailing_slash(self):
@ -12,7 +12,7 @@ class TestUrlHandling(KeystoneTestCase):
self.assertEqual(r1.read(), r2.read())
class TestContentTypes(KeystoneTestCase):
class TestContentTypes(common.KeystoneTestCase):
"""Tests API's Content-Type handling"""
def test_default_content_type(self):

View File

@ -1,8 +1,8 @@
import unittest
from common import KeystoneTestCase
from keystone.test.functional import common
class TestStaticFiles(KeystoneTestCase):
class TestStaticFiles(common.KeystoneTestCase):
def test_pdf_contract(self):
r = self.service_request(path='/identitydevguide.pdf')
self.assertTrue('pdf' in r.getheader('Content-Type'))
@ -44,7 +44,7 @@ class TestStaticFiles(KeystoneTestCase):
self.assertTrue('css' in r.getheader('Content-Type'))
class TestAdminStaticFiles(KeystoneTestCase):
class TestAdminStaticFiles(common.KeystoneTestCase):
def test_pdf_contract(self):
r = self.admin_request(path='/identityadminguide.pdf')
self.assertTrue('pdf' in r.getheader('Content-Type'))