Guang Yee 2011-12-26 15:44:56 -08:00 committed by Ziad Sawalha
parent f9ce9115a5
commit fafc25b3b1
30 changed files with 703 additions and 44 deletions

View File

@ -124,7 +124,7 @@ Here is an example showing how you can manually start the ``keystone-auth`` serv
keystone-legacy-auth: INFO certfile /etc/keystone/ssl/certs/keystone.pem
keystone-legacy-auth: INFO debug True
keystone-legacy-auth: INFO default_store sqlite
keystone-legacy-auth: INFO extensions osksadm,oskscatalog
keystone-legacy-auth: INFO extensions osksadm,oskscatalog,hpidm
keystone-legacy-auth: INFO hash-password True
keystone-legacy-auth: INFO keyfile /etc/keystone/ssl/private/keystonekey.pem
keystone-legacy-auth: INFO keystone-admin-role Admin
@ -156,7 +156,7 @@ Here is an example showing how you can manually start the ``keystone-auth`` serv
admin : INFO certfile /etc/keystone/ssl/certs/keystone.pem
admin : INFO debug True
admin : INFO default_store sqlite
admin : INFO extensions osksadm,oskscatalog
admin : INFO extensions osksadm,oskscatalog,hpidm
admin : INFO hash-password True
admin : INFO keyfile /etc/keystone/ssl/private/keystonekey.pem
admin : INFO keystone-admin-role Admin

View File

@ -71,6 +71,15 @@ RAX-KEY
This is an Admin and Service API extension.
HP-IDM
This extension adds capability to filter roles with optional service IDs
for token validation to mitigate security risks with role name conflicts.
See https://bugs.launchpad.net/keystone/+bug/890411 for more details.
This is an Admin API extension. Applicable to validate token (GET)
and check token (HEAD) APIs only.
.. note::
The included extensions are in the process of being rewritten. Currently

View File

@ -28,7 +28,7 @@ service-header-mappings = {
#List of extensions currently loaded.
#Refer docs for list of supported extensions.
extensions= osksadm,oskscatalog
extensions= osksadm, oskscatalog, hpidm
# Address to bind the API server
# TODO Properties defined within app not available via pipeline.
@ -74,6 +74,18 @@ keystone-service-admin-role = KeystoneServiceAdmin
#Tells whether password user need to be hashed in the backend
hash-password = True
# This property is applicable to hpidm extension only.
# It will be ignored if hpidm extension is disabled.
#
# Specify the global service ID to dictate how the global roles
# are to be returned/processed in validate token call. Notice
# that middle-ware or API clients must specify the exact same
# global service ID in order for Keystone to retrieve the
# global roles in validate token call. Otherwise, it will
# likely result in a 401 since the mismatched global ID
# may not exist in Keystone and therefore considered invalid.
global_service_id = global
[keystone.backends.sqlalchemy]
# SQLAlchemy connection string for the reference implementation registry
# server. Any valid SQLAlchemy connection string is fine.

View File

@ -14,7 +14,7 @@ default_store = sqlite
log_file = keystone.ldap.log
log_dir = .
backends = keystone.backends.sqlalchemy,keystone.backends.ldap
extensions= osksadm,oskscatalog
extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',

View File

@ -23,7 +23,7 @@ default_store = sqlite
log_file = keystone.memcache.log
log_dir = .
backends = keystone.backends.sqlalchemy,keystone.backends.memcache
extensions= osksadm,oskscatalog
extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',

View File

@ -21,6 +21,7 @@ default_store = sqlite
log_file = keystone.ssl.log
log_dir = .
backends = keystone.backends.sqlalchemy
extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',

View File

@ -23,6 +23,7 @@ DEFAULT_BACKENDS = 'keystone.backends.sqlalchemy'
#Configs applicable to all backends.
SHOULD_HASH_PASSWORD = None
GLOBAL_SERVICE_ID = None # to facilitate global roles for validate tokens
def configure_backends(options):
@ -38,3 +39,6 @@ def configure_backends(options):
if "hash-password" in options\
and ast.literal_eval(options["hash-password"]) == True:
SHOULD_HASH_PASSWORD = options["hash-password"]
global GLOBAL_SERVICE_ID
GLOBAL_SERVICE_ID = options.get("global_service_id", "global")

View File

@ -93,6 +93,12 @@ def _match(key, value, attrs):
# This is a wild card search. Implemented as all or nothing for now.
if value == "*":
return True
if key == 'serviceId':
# for serviceId, the backend is returning a list of numbers
# make sure we convert them to strings first before comparing
# them
str_sids = map(lambda x: str(x), attrs[key])
return str(value) in str_sids
if key != "objectclass":
return value in attrs[key]
# it is an objectclass check, so check subclasses

View File

@ -0,0 +1,33 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from keystone.contrib.extensions.admin.extension import BaseExtensionHandler
from keystone.controllers.token import TokenController
class ExtensionHandler(BaseExtensionHandler):
def map_extension_methods(self, mapper, options):
token_controller = TokenController(options)
# Token Operations
mapper.connect("/tokens/{token_id}", controller=token_controller,
action="validate_token",
conditions=dict(method=["GET"]))
mapper.connect("/tokens/{tenant_id}",
controller=token_controller,
action="check_token", conditions=dict(method=["HEAD"]))

View File

@ -0,0 +1,21 @@
{
"extension": {
"name": "HP Token Validation Extension",
"namespace": "http://docs.openstack.org/identity/api/ext/HP-IDM/v1.0",
"alias": "HP-IDM",
"updated": "2011-12-06T19:00:00-00:00",
"description": "Validate token with the optional serviceId parameter so that only the roles associated with the given service IDs are returned. See https://bugs.launchpad.net/keystone/+bug/890411 for more details.",
"links": [
{
"rel": "describedby",
"type": "application/pdf",
"href": "https://github.com/openstack/keystone/raw/master/keystone/content/admin/HP-IDM-admin-devguide.pdf"
},
{
"rel": "describedby",
"type": "application/vnd.sun.wadl+xml",
"href": "https://raw.github.com/openstack/keystone/master/keystone/content/admin/HP-IDM-admin.wadl"
}
]
}
}

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<extension
xmlns="http://docs.openstack.org/common/api/v1.0"
xmlns:atom="http://www.w3.org/2005/Atom"
name="HP Token Validation Extension"
namespace="http://docs.openstack.org/identity/api/ext/HP-IDM/v1.0"
alias="HP-IDM"
updated="2011-12-25T17:00:00-00:00">
<description>
Validate token with the optional serviceId parameter so that only the roles associated with the given service IDs are returned. See https://bugs.launchpad.net/keystone/+bug/890411 for more details.
</description>
<atom:link rel="describedby"
type="application/pdf"
href="https://github.com/openstack/keystone/raw/master/keystone/content/admin/HP-IDM-admin-devguide.pdf"/>
<atom:link rel="describedby"
type="application/vnd.sun.wadl+xml"
href="https://github.com/openstack/keystone/raw/master/keystone/content/admin/HP-IDM-admin.wadl"/>
</extension>

View File

@ -26,6 +26,7 @@ calls from the request routers.
from keystone import utils
from keystone.common import wsgi
from keystone.logic import extension_reader
from keystone.logic.types import auth
from keystone.logic.types import fault
from keystone.logic import service
@ -68,8 +69,12 @@ class TokenController(wsgi.Controller):
def _validate_token(self, req, token_id):
"""Validates the token, and that it belongs to the specified tenant"""
belongs_to = req.GET.get('belongsTo')
service_ids = None
if extension_reader.is_extension_supported(self.options, 'hpidm'):
# service IDs are only relevant if hpidm extension is enabled
service_ids = req.GET.get('HP-IDM-serviceId')
return self.identity_service.validate_token(
utils.get_auth_token(req), token_id, belongs_to)
utils.get_auth_token(req), token_id, belongs_to, service_ids)
@utils.wrap_error
def validate_token(self, req, token_id):

View File

@ -10,6 +10,29 @@ from keystone.logic.types.extension import Extensions
EXTENSIONS_PATH = 'contrib/extensions'
def get_supported_extensions(options):
"""
Returns list of supported extensions.
options - global configuration options
"""
return [extension.strip() for extension in
options.get(CONFIG_EXTENSION_PROPERTY,
DEFAULT_EXTENSIONS).split(',')]
def is_extension_supported(options, extension_name):
"""
Return True if the extension is enabled, False otherwise.
options - global configuration options
extension_name - extension name
extension_name is case-sensitive.
"""
if (extension_name is not None) and (options is not None):
return extension_name in get_supported_extensions(options)
return False
class ExtensionsReader(object):
"""Reader to read static extensions content"""
@ -75,8 +98,7 @@ class ExtensionsReader(object):
def __get_supported_extensions(self):
""" Returns list of supported extensions."""
if self.supported_extensions is None:
self.supported_extensions = self.options.get(
CONFIG_EXTENSION_PROPERTY, DEFAULT_EXTENSIONS).split(',')
self.supported_extensions = get_supported_extensions(self.options)
return self.supported_extensions
def __get_extension_json(self, extension_name):

View File

@ -253,7 +253,7 @@ def get_auth_data(dtoken):
return auth.AuthData(token, user, endpoints, url_types=url_types)
def get_validate_data(dtoken, duser):
def get_validate_data(dtoken, duser, service_ids=None):
"""return ValidateData object for a token/user pair"""
tenant = None
if dtoken.tenant_id:
@ -262,19 +262,14 @@ def get_validate_data(dtoken, duser):
token = auth.Token(dtoken.expires, dtoken.id, tenant)
ts = []
if dtoken.tenant_id:
drole_refs = api.ROLE.ref_get_all_tenant_roles(duser.id,
dtoken.tenant_id)
for drole_ref in drole_refs:
drole = api.ROLE.get(drole_ref.role_id)
ts.append(Role(drole_ref.role_id, drole.name,
None, drole_ref.tenant_id))
drole_refs = api.ROLE.ref_get_all_global_roles(duser.id)
for drole_ref in drole_refs:
drole = api.ROLE.get(drole_ref.role_id)
ts.append(Role(drole_ref.role_id, drole.name,
None, drole_ref.tenant_id))
ts = get_tenant_roles_for_user_and_services(duser.id,
dtoken.tenant_id,
service_ids)
if (not dtoken.tenant_id or not service_ids or
(backends.GLOBAL_SERVICE_ID in service_ids)):
# return the global roles for unscoped tokens or
# its ID is in the service IDs
ts = ts + get_global_roles_for_user(duser.id)
# Also get the user's tenant's name
tenant_name = None
@ -344,6 +339,90 @@ def validate_token(token_id, belongs_to=None, is_check_token=None):
return (token, user)
def parse_service_ids(service_ids):
"""
Method to parse the service IDs string.
service_ids -- comma-separated service IDs
parse and return a list of service IDs.
"""
if service_ids:
return service_ids.rstrip().split(',')
return []
def validate_service_ids(service_ids):
"""
Method to validate the service IDs.
service_ids -- list of service IDs
If not service IDs or encounter an invalid service ID,
fault.UnauthorizedFault will be raised.
"""
if not service_ids:
raise fault.UnauthorizedFault("Missing service IDs")
services = [api.SERVICE.get(service_id) for service_id in service_ids
if not service_id == backends.GLOBAL_SERVICE_ID]
if not all(services):
raise fault.UnauthorizedFault("Invalid service ID: %s" % (service_id))
def get_roles_names_by_service_ids(service_ids):
"""
Method to find all the roles for the given service IDs.
service_ids -- list of service IDs
"""
roles = []
for service_id in service_ids:
if service_id != backends.GLOBAL_SERVICE_ID:
sroles = api.ROLE.get_by_service(service_id=service_id)
if sroles:
roles = roles + sroles
return [role.name for role in roles]
def get_global_roles_for_user(user_id):
"""
Method to return all the global roles for the given user.
user_id -- user ID
"""
ts = []
drole_refs = api.ROLE.ref_get_all_global_roles(user_id)
for drole_ref in drole_refs:
drole = api.ROLE.get(drole_ref.role_id)
ts.append(Role(drole_ref.role_id, drole.name,
None, drole_ref.tenant_id))
return ts
def get_tenant_roles_for_user_and_services(user_id, tenant_id,
service_ids):
"""
Method to return all the tenant roles for the given user,
filtered by service ID.
user_id -- user ID
tenant_id -- tenant ID
service_ids -- service IDs
If service_ids are specified, will return the roles filtered by
service IDs.
"""
ts = []
if tenant_id and user_id:
drole_refs = api.ROLE.ref_get_all_tenant_roles(user_id,
tenant_id)
for drole_ref in drole_refs:
drole = api.ROLE.get(drole_ref.role_id)
ts.append(Role(drole_ref.role_id, drole.name,
None, drole_ref.tenant_id))
if service_ids:
# if service IDs are specified, filter roles by service IDs
sroles_names = get_roles_names_by_service_ids(service_ids)
return [role for role in ts
if role.name in sroles_names]
else:
return ts
class IdentityService(object):
"""Implements the Identity service
@ -474,10 +553,20 @@ class IdentityService(object):
return get_auth_data(dtoken)
@staticmethod
def validate_token(admin_token, token_id, belongs_to=None):
def validate_token(admin_token, token_id, belongs_to=None,
service_ids=None):
validate_service_admin_token(admin_token)
(token, user) = validate_token(token_id, belongs_to, True)
return get_validate_data(token, user)
if service_ids and (token.tenant_id or belongs_to):
# scope token, validate the service IDs if present
service_ids = parse_service_ids(service_ids)
validate_service_ids(service_ids)
auth_data = get_validate_data(token, user, service_ids)
if service_ids and (token.tenant_id or belongs_to):
# we have service Ids and scope token, make sure we have some roles
if not auth_data.user.role_refs.values:
raise fault.UnauthorizedFault("No roles found for scope token")
return auth_data
@staticmethod
def revoke_token(admin_token, token_id):

View File

@ -180,7 +180,7 @@ def process(*args):
raise optparse.OptParseError(ACTION_NOT_SUPPORTED % ('tenants'))
elif (object_type, action) == ('role', 'add'):
if api.add_role(name=object_id):
if api.add_role(name=object_id, service_name=optional_arg(args, 3)):
print "SUCCESS: Role %s created successfully." % object_id
elif (object_type, action) == ('role', 'list'):

View File

@ -54,9 +54,17 @@ def disable_tenant(name):
return db_api.TENANT.update(obj.id, obj)
def add_role(name):
def add_role(name, service_name=None):
obj = db_models.Role()
obj.name = name
names = name.split(":")
if len(names) == 2:
service_name = names[0] or service_name
if service_name:
# we have a role with service prefix, fill in the service ID
service = db_api.SERVICE.get_by_name(name=service_name)
obj.service_id = service.id
return db_api.ROLE.create(obj)

View File

@ -104,6 +104,7 @@ import json
import os
from paste.deploy import loadapp
import time
import urllib
from urlparse import urlparse
from webob.exc import HTTPUnauthorized
from webob.exc import Request, Response
@ -159,6 +160,11 @@ class AuthProtocol(object):
self.service_protocol = conf.get('service_protocol', 'https')
self.service_host = conf.get('service_host')
service_port = conf.get('service_port')
service_ids = conf.get('service_ids')
self.serviceId_qs = ''
if service_ids:
self.serviceId_qs = '?HP-IDM-serviceId=%s' % \
(urllib.quote(service_ids))
if service_port:
self.service_port = int(service_port)
self.service_url = '%s://%s:%s' % (self.service_protocol,
@ -429,7 +435,8 @@ class AuthProtocol(object):
# "X-Auth-Token": admin_token}
# we're using a test token from the ini file for now
conn = http_connect(self.auth_host, self.auth_port, 'GET',
'/v2.0/tokens/%s' % claims, headers=headers,
'/v2.0/tokens/%s%s' % (claims, self.serviceId_qs),
headers=headers,
ssl=(self.auth_protocol == 'https'),
key_file=self.key_file, cert_file=self.cert_file,
timeout=self.auth_timeout)

View File

@ -71,6 +71,7 @@ HTTP_X_AUTHORIZATION
import httplib
import json
import logging
import urllib
from urlparse import urlparse
from webob.exc import HTTPUnauthorized, Request, Response
@ -129,11 +130,17 @@ class AuthProtocol(object):
self.admin_user = conf.get('auth_admin_user')
self.admin_password = conf.get('auth_admin_password')
self.admin_token = conf.get('auth_admin_token')
# bind to one or more service instances
service_ids = conf.get('service_ids')
self.serviceId_qs = ''
if service_ids:
self.serviceId_qs = '?HP-IDM-serviceId=%s' % \
(urllib.quote(service_ids))
def _build_token_uri(self, claims=None):
uri = "/v" + self.auth_api_version + "/tokens" + \
(claims and '/' + claims or '')
return uri
claim_str = "/%s" % claims if claims else ""
return "/v%s/tokens%s%s" % (self.auth_api_version, claim_str,
self.serviceId_qs or '')
def __init__(self, app, conf):
""" Common initialization code """

View File

@ -353,6 +353,7 @@ class KeystoneTest(object):
"""
config_params = {'test_dir': TEST_DIR, 'base_dir': BASE_DIR}
isSsl = False
hpidmDisabled = False
config_name = None
test_files = None
server = None
@ -398,6 +399,10 @@ class KeystoneTest(object):
if (self.isSsl == True):
os.environ['cert_file'] = TEST_CERT
# indicating HP-IDM is disabled
if self.hpidmDisabled:
os.environ['HP-IDM_Disabled'] = 'True'
# run the keystone server
logger.info("Starting the keystone server...")
@ -659,3 +664,10 @@ class LDAPTest(SQLTest):
from keystone.backends.ldap.fakeldap import FakeShelve
db = FakeShelve().get_instance()
db.clear()
class ClientWithoutHPIDMTest(ClientTests):
"""Test with HP-IDM disabled to make sure it is backward compatible"""
config_name = 'sql_no_hpidm.conf.template'
hpidmDisabled = True
test_files = ('keystone.nohpidm.db',)

View File

@ -1,3 +1,4 @@
import os
import unittest2 as unittest
from keystone.test.functional import common
@ -28,13 +29,18 @@ class TestAdminExtensions(common.ApiTestCase):
self.assertIsNotNone(content['extensions']['values'])
found_osksadm = False
found_oskscatalog = False
found_hpidm = False
for value in content['extensions']['values']:
if value['extension']['alias'] == 'OS-KSADM':
found_osksadm = True
if value['extension']['alias'] == 'OS-KSCATALOG':
found_oskscatalog = True
if value['extension']['alias'] == 'HP-IDM':
found_hpidm = True
self.assertTrue(found_osksadm, "Missing OS-KSADM extension.")
self.assertTrue(found_oskscatalog, "Missing OS-KSCATALOG extension.")
if not common.isSsl() and 'HP-IDM_Disabled' not in os.environ:
self.assertTrue(found_hpidm, "Missing HP-IDM extension.")
def test_extensions_xml(self):
r = self.admin_request(path='/extensions.xml')
@ -44,13 +50,18 @@ class TestAdminExtensions(common.ApiTestCase):
"{http://docs.openstack.org/common/api/v1.0}extension")
found_osksadm = False
found_oskscatalog = False
found_hpidm = False
for extension in extensions:
if extension.get("alias") == 'OS-KSADM':
found_osksadm = True
if extension.get("alias") == 'OS-KSCATALOG':
found_oskscatalog = True
if extension.get("alias") == 'HP-IDM':
found_hpidm = True
self.assertTrue(found_osksadm, "Missing OS-KSADM extension.")
self.assertTrue(found_oskscatalog, "Missing OS-KSCATALOG extension.")
if not common.isSsl() and 'HP-IDM_Disabled' not in os.environ:
self.assertTrue(found_hpidm, "Missing HP-IDM extension.")
if __name__ == '__main__':

View File

@ -5,7 +5,7 @@ default_store = sqlite
log_file = %(test_dir)s/keystone.ldap.log
log_dir = %(test_dir)s
backends = keystone.backends.sqlalchemy,keystone.backends.ldap
extensions= osksadm,oskscatalog
extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',

View File

@ -5,7 +5,7 @@ default_store = sqlite
log_file = %(test_dir)s/keystone.memcache.log
log_dir = %(test_dir)s
backends = keystone.backends.sqlalchemy,keystone.backends.memcache
extensions= osksadm,oskscatalog
extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',

View File

@ -5,7 +5,7 @@ default_store = sqlite
log_file = %(test_dir)s/keystone.sql.log
log_dir = %(test_dir)s
backends = keystone.backends.sqlalchemy
extensions= osksadm,oskscatalog
extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',

View File

@ -0,0 +1,54 @@
[DEFAULT]
verbose = False
debug = False
default_store = sqlite
log_file = %(test_dir)s/keystone.sql.log
log_dir = %(test_dir)s
backends = keystone.backends.sqlalchemy
extensions= osksadm, oskscatalog
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
'cdn' : 'X-CDN-Management-Url'}
service_host = 0.0.0.0
service_port = %(service_port)s
service_ssl = False
admin_host = 0.0.0.0
admin_port = %(admin_port)s
admin_ssl = False
keystone-admin-role = Admin
keystone-service-admin-role = KeystoneServiceAdmin
hash-password = True
[keystone.backends.sqlalchemy]
sql_connection = sqlite://
sql_idle_timeout = 30
backend_entities = ['Endpoints', 'Credentials', 'EndpointTemplates', 'Tenant', 'User', 'UserRoleAssociation', 'Role', 'Token', 'Service']
[pipeline:admin]
pipeline =
urlrewritefilter
d5_compat
admin_api
[pipeline:keystone-legacy-auth]
pipeline =
urlrewritefilter
legacy_auth
d5_compat
service_api
[app:service_api]
paste.app_factory = keystone.server:service_app_factory
[app:admin_api]
paste.app_factory = keystone.server:admin_app_factory
[filter:urlrewritefilter]
paste.filter_factory = keystone.middleware.url:filter_factory
[filter:d5_compat]
paste.filter_factory = keystone.frontends.d5_compat:filter_factory
[filter:legacy_auth]
paste.filter_factory = keystone.frontends.legacy_token_auth:filter_factory

View File

@ -5,6 +5,7 @@ default_store = sqlite
log_file = %(test_dir)s/keystone.ssl.log
log_dir = %(test_dir)s
backends = keystone.backends.sqlalchemy
extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',

View File

@ -229,7 +229,7 @@ class ApiTestCase(RestfulTestCase):
"'Tenant', 'User', 'Credentials', 'EndpointTemplates', "
"'Token', 'Service']",
},
'extensions': 'osksadm,oskscatalog',
'extensions': 'osksadm,oskscatalog,hpidm',
'keystone-admin-role': 'Admin',
'keystone-service-admin-role': 'KeystoneServiceAdmin',
'hash-password': 'True',
@ -1214,12 +1214,15 @@ class FunctionalTestCase(ApiTestCase):
return self.delete_user_role(user_id, tenant_id, **kwargs)
def create_role(self, role_name=None, role_description=None,
service_id=None, **kwargs):
service_id=None, service_name=None, **kwargs):
"""Creates a role for testing
The role name and description are generated from UUIDs.
"""
role_name = optional_str(role_name)
if service_name and not role_name:
role_name = "%s:%s" % (service_name, optional_str(role_name))
else:
role_name = optional_str(role_name)
role_description = optional_str(role_description)
data = {
@ -1517,6 +1520,18 @@ class MiddlewareTestCase(FunctionalTestCase):
"""
use_server = True
def _setup_test_middleware(self):
test_middleware = None
if isinstance(self.middleware, tuple):
test_middleware = HeaderApp()
for filter in self.middleware:
test_middleware = \
filter.filter_factory(self.settings)(test_middleware)
else:
test_middleware = \
self.middleware.filter_factory(self.settings)(HeaderApp())
return test_middleware
def setUp(self, middleware, settings=None):
super(MiddlewareTestCase, self).setUp()
if settings is None:
@ -1535,14 +1550,9 @@ class MiddlewareTestCase(FunctionalTestCase):
cert_file = isSsl()
if cert_file:
settings['certfile'] = cert_file
if isinstance(middleware, tuple):
self.test_middleware = HeaderApp()
for filter in middleware:
self.test_middleware = \
filter.filter_factory(settings)(self.test_middleware)
else:
self.test_middleware = \
middleware.filter_factory(settings)(HeaderApp())
self.settings = settings
self.middleware = middleware
self.test_middleware = self._setup_test_middleware()
name = unique_str()
r = self.create_tenant(tenant_name=name, assert_status=201)
@ -1571,6 +1581,60 @@ class MiddlewareTestCase(FunctionalTestCase):
self.create_endpoint_for_tenant(self.tenant['id'],
self.endpoint_templates[x]['id'])
@unittest.skipIf(isSsl() or 'HP-IDM_Disabled' in os.environ,
"Skipping SSL or HP-IDM tests")
def test_with_service_id(self):
# create a service role so the scope token validation will succeed
role_resp = self.create_role(service_name=self.services[0]['name'])
role = role_resp.json['role']
self.grant_role_to_user(self.tenant_user['id'],
role['id'], self.tenant['id'])
auth_resp = self.authenticate(self.tenant_user['name'],
self.tenant_user['password'],
self.tenant['id'], assert_status=200)
user_token = auth_resp.json['access']['token']['id']
self.settings['service_ids'] = "%s" % self.services[0]['id']
test_middleware = self._setup_test_middleware()
resp = Request.blank('/',
headers={'X-Auth-Token': user_token}) \
.get_response(test_middleware)
self.assertEquals(resp.status_int, 200)
# now give it a bogus service ID to make sure we get a 401
self.settings['service_ids'] = "boguzz"
test_middleware = self._setup_test_middleware()
resp = Request.blank('/',
headers={'X-Auth-Token': user_token}) \
.get_response(test_middleware)
self.assertEquals(resp.status_int, 401)
@unittest.skipUnless(not isSsl() and 'HP-IDM_Disabled' in os.environ,
"Skipping since HP-IDM is enabled")
def test_with_service_id_with_hpidm_disabled(self):
# create a service role so the scope token validation will succeed
role_resp = self.create_role(service_name=self.services[0]['name'])
role = role_resp.json['role']
self.grant_role_to_user(self.tenant_user['id'],
role['id'], self.tenant['id'])
auth_resp = self.authenticate(self.tenant_user['name'],
self.tenant_user['password'],
self.tenant['id'], assert_status=200)
user_token = auth_resp.json['access']['token']['id']
self.settings['service_ids'] = "%s" % self.services[0]['id']
test_middleware = self._setup_test_middleware()
resp = Request.blank('/',
headers={'X-Auth-Token': user_token}) \
.get_response(test_middleware)
self.assertEquals(resp.status_int, 200)
# now give it a bogus service ID to make sure it got ignored
self.settings['service_ids'] = "boguzz"
test_middleware = self._setup_test_middleware()
resp = Request.blank('/',
headers={'X-Auth-Token': user_token}) \
.get_response(test_middleware)
self.assertEquals(resp.status_int, 200)
def test_401_without_token(self):
resp = Request.blank('/').get_response(self.test_middleware)
self.assertEquals(resp.status_int, 401)

View File

@ -0,0 +1,258 @@
import unittest2 as unittest
from keystone.test.functional import common
class TestHPIDMTokensExtension(common.FunctionalTestCase):
"""Test HP-IDM token validation extension"""
def setUp(self):
super(TestHPIDMTokensExtension, self).setUp()
password = common.unique_str()
self.user = self.create_user(user_password=password).json['user']
self.user['password'] = password
self.tenant = self.create_tenant().json['tenant']
self.service = self.create_service().json['OS-KSADM:service']
r = self.create_role(service_name=self.service['name'])
self.role = r.json['role']
self.another_service = self.create_service().json['OS-KSADM:service']
self.service_with_no_users = self.create_service().\
json['OS-KSADM:service']
ar = self.create_role(service_name=self.another_service['name'])
self.another_role = ar.json['role']
rnu = self.create_role(service_name=self.service_with_no_users['name'])
self.role_with_no_users = rnu.json['role']
rns = self.create_role()
self.role_with_no_service = rns.json['role']
self.grant_role_to_user(self.user['id'],
self.role['id'], self.tenant['id'])
self.grant_role_to_user(self.user['id'],
self.role_with_no_service['id'],
self.tenant['id'])
self.grant_role_to_user(self.user['id'],
self.another_role['id'], self.tenant['id'])
self.global_role = self.create_role().json['role']
# crete a global role
self.put_user_role(self.user['id'], self.global_role['id'], None)
def get_token_belongsto(self, token_id, tenant_id, service_ids, **kwargs):
"""GET /tokens/{token_id}?belongsTo={tenant_id}
[&HP-IDM-serviceId={service_ids}]"""
serviceId_qs = ""
if service_ids:
serviceId_qs = "&HP-IDM-serviceId=%s" % (service_ids)
return self.admin_request(method='GET',
path='/tokens/%s?belongsTo=%s%s' % (token_id, tenant_id,
serviceId_qs), **kwargs)
def check_token_belongs_to(self, token_id, tenant_id, service_ids,
**kwargs):
"""HEAD /tokens/{token_id}?belongsTo={tenant_id}
[&HP-IDM-serviceId={service_ids}]"""
serviceId_qs = ""
if service_ids:
serviceId_qs = "&HP-IDM-serviceId=%s" % (service_ids)
return self.admin_request(method='HEAD',
path='/tokens/%s?belongsTo=%s%s' % (token_id, tenant_id,
serviceId_qs), **kwargs)
@unittest.skipIf(common.isSsl(),
"Skipping SSL tests")
def test_token_validation_with_serviceId(self):
scoped = self.post_token(as_json={
'auth': {
'passwordCredentials': {
'username': self.user['name'],
'password': self.user['password']},
'tenantName': self.tenant['name']}}).json['access']
self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
self.assertEqual(scoped['token']['tenant']['name'],
self.tenant['name'])
# And an admin should be able to validate that our new token is scoped
r = self.get_token_belongsto(token_id=scoped['token']['id'],
tenant_id=self.tenant['id'], service_ids=self.service['id'])
access = r.json['access']
self.assertEqual(access['user']['id'], self.user['id'])
self.assertEqual(access['user']['name'], self.user['name'])
self.assertEqual(access['token']['tenant']['id'], self.tenant['id'])
self.assertEqual(access['token']['tenant']['name'],
self.tenant['name'])
# make sure only the service roles are returned
self.assertIsNotNone(access['user'].get('roles'))
self.assertEqual(len(access['user']['roles']), 1)
self.assertEqual(access['user']['roles'][0]['name'],
self.role['name'])
# make sure check token also works
self.check_token_belongs_to(token_id=scoped['token']['id'],
tenant_id=self.tenant['id'], service_ids=self.service['id'],
assert_status=200)
@unittest.skipIf(common.isSsl(),
"Skipping SSL tests")
def test_token_validation_with_all_serviceId(self):
scoped = self.post_token(as_json={
'auth': {
'passwordCredentials': {
'username': self.user['name'],
'password': self.user['password']},
'tenantName': self.tenant['name']}}).json['access']
self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
self.assertEqual(scoped['token']['tenant']['name'],
self.tenant['name'])
# And an admin should be able to validate that our new token is scoped
service_ids = "%s,%s" % \
(self.service['id'], self.another_service['id'])
r = self.get_token_belongsto(token_id=scoped['token']['id'],
tenant_id=self.tenant['id'], service_ids=service_ids)
access = r.json['access']
self.assertEqual(access['user']['id'], self.user['id'])
self.assertEqual(access['user']['name'], self.user['name'])
self.assertEqual(access['token']['tenant']['id'], self.tenant['id'])
self.assertEqual(access['token']['tenant']['name'],
self.tenant['name'])
# make sure only the service roles are returned
self.assertIsNotNone(access['user'].get('roles'))
self.assertEqual(len(access['user']['roles']), 2)
role_names = map(lambda x: x['name'], access['user']['roles'])
self.assertTrue(self.role['name'] in role_names)
self.assertTrue(self.another_role['name'] in role_names)
@unittest.skipIf(common.isSsl(),
"Skipping SSL tests")
def test_token_validation_with_no_user_service(self):
scoped = self.post_token(as_json={
'auth': {
'passwordCredentials': {
'username': self.user['name'],
'password': self.user['password']},
'tenantName': self.tenant['name']}}).json['access']
self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
self.assertEqual(scoped['token']['tenant']['name'],
self.tenant['name'])
# And an admin should be able to validate that our new token is scoped
service_ids = "%s,%s,%s" % (self.service['id'],
self.another_service['id'],
self.service_with_no_users['id'])
r = self.get_token_belongsto(token_id=scoped['token']['id'],
tenant_id=self.tenant['id'], service_ids=service_ids)
access = r.json['access']
self.assertEqual(access['user']['id'], self.user['id'])
self.assertEqual(access['user']['name'], self.user['name'])
self.assertEqual(access['token']['tenant']['id'], self.tenant['id'])
self.assertEqual(access['token']['tenant']['name'],
self.tenant['name'])
# make sure only the service roles are returned, excluding the one
# with no users
self.assertIsNotNone(access['user'].get('roles'))
self.assertEqual(len(access['user']['roles']), 2)
role_names = map(lambda x: x['name'], access['user']['roles'])
self.assertTrue(self.role['name'] in role_names)
self.assertTrue(self.another_role['name'] in role_names)
# make sure check token also works
self.check_token_belongs_to(token_id=scoped['token']['id'],
tenant_id=self.tenant['id'], service_ids=service_ids,
assert_status=200)
@unittest.skipIf(common.isSsl(),
"Skipping SSL tests")
def test_token_validation_without_serviceId(self):
scoped = self.post_token(as_json={
'auth': {
'passwordCredentials': {
'username': self.user['name'],
'password': self.user['password']},
'tenantName': self.tenant['name']}}).json['access']
self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
self.assertEqual(scoped['token']['tenant']['name'],
self.tenant['name'])
# And an admin should be able to validate that our new token is scoped
r = self.get_token_belongsto(token_id=scoped['token']['id'],
tenant_id=self.tenant['id'], service_ids=None)
access = r.json['access']
self.assertEqual(access['user']['id'], self.user['id'])
self.assertEqual(access['user']['name'], self.user['name'])
self.assertEqual(access['token']['tenant']['id'], self.tenant['id'])
self.assertEqual(access['token']['tenant']['name'],
self.tenant['name'])
# make sure all the roles are returned
self.assertIsNotNone(access['user'].get('roles'))
self.assertEqual(len(access['user']['roles']), 4)
role_names = map(lambda x: x['name'], access['user']['roles'])
self.assertTrue(self.role['name'] in role_names)
self.assertTrue(self.another_role['name'] in role_names)
self.assertTrue(self.global_role['name'] in role_names)
self.assertTrue(self.role_with_no_service['name'] in role_names)
@unittest.skipIf(common.isSsl(),
"Skipping SSL tests")
def test_token_validation_with_global_service_id(self):
scoped = self.post_token(as_json={
'auth': {
'passwordCredentials': {
'username': self.user['name'],
'password': self.user['password']},
'tenantName': self.tenant['name']}}).json['access']
self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
self.assertEqual(scoped['token']['tenant']['name'],
self.tenant['name'])
service_ids = "%s,%s,global" % (self.service['id'],
self.another_service['id'])
r = self.get_token_belongsto(token_id=scoped['token']['id'],
tenant_id=self.tenant['id'], service_ids=service_ids)
access = r.json['access']
self.assertEqual(access['user']['id'], self.user['id'])
self.assertEqual(access['user']['name'], self.user['name'])
self.assertEqual(access['token']['tenant']['id'], self.tenant['id'])
self.assertEqual(access['token']['tenant']['name'],
self.tenant['name'])
# make sure only the service roles are returned
self.assertIsNotNone(access['user'].get('roles'))
self.assertEqual(len(access['user']['roles']), 3)
role_names = map(lambda x: x['name'], access['user']['roles'])
self.assertTrue(self.role['name'] in role_names)
self.assertTrue(self.another_role['name'] in role_names)
self.assertTrue(self.global_role['name'] in role_names)
@unittest.skipIf(common.isSsl(),
"Skipping SSL tests")
def test_token_validation_with_bogus_service_id(self):
scoped = self.post_token(as_json={
'auth': {
'passwordCredentials': {
'username': self.user['name'],
'password': self.user['password']},
'tenantName': self.tenant['name']}}).json['access']
self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
self.assertEqual(scoped['token']['tenant']['name'],
self.tenant['name'])
service_ids = "%s,%s,boguzzz" % (self.service['id'],
self.another_service['id'])
self.get_token_belongsto(token_id=scoped['token']['id'],
tenant_id=self.tenant['id'], service_ids=service_ids,
assert_status=401)
# make sure check token also works
self.check_token_belongs_to(token_id=scoped['token']['id'],
tenant_id=self.tenant['id'], service_ids=service_ids,
assert_status=401)
if __name__ == '__main__':
unittest.main()

View File

@ -6,8 +6,15 @@ DEFAULT_FIXTURE = [
('tenant', 'add', 'ANOTHER:TENANT'),
('tenant', 'add', 'project-y'),
('tenant', 'disable', 'project-y'),
('tenant', 'add', 'coffee-tea'),
# Add some services for the service roles
('service', 'add', 'coffee',
'coffee service', 'coffee service'),
('service', 'add', 'tea',
'tea house', 'tea house'),
# Users
('user', 'add', 'joeuser', 'secrete', 'customer-x'),
('user', 'add', 'pete', 'secrete', 'coffee-tea'),
('user', 'add', 'joeadmin', 'secrete', 'customer-x'),
('user', 'add', 'admin', 'secrete'),
('user', 'add', 'serviceadmin', 'secrete', 'customer-x'),
@ -24,6 +31,11 @@ DEFAULT_FIXTURE = [
('role', 'grant', 'Admin', 'nodefaulttenant', 'customer-x'),
('role', 'add', 'Member'),
('role', 'grant', 'Member', 'joeuser', 'customer-x'),
('role', 'add', 'barista', 'coffee'),
('role', 'add', 'barista', 'tea'),
('role', 'grant', 'barista', 'pete'),
('role', 'add', 'barista-global'),
('role', 'grant', 'barista-global', 'pete'),
# Add Services
#1 Service Name:exampleservice Type:example type
('service', 'add', 'exampleservice',

View File

@ -28,6 +28,7 @@ TESTS = [
# But tests pass
# MemcacheTest,
test.SSLTest,
test.ClientWithoutHPIDMTest,
]

View File

@ -13,6 +13,7 @@ function usage {
echo " SSLTest: runs client tests with SSL configured"
echo " LDAPTest: runs functional tests with LDAP backend"
echo " MemcacheTest: runs functional tests with memcached storing tokens"
echo " ClientWithoutHPIDMTest: runs client tests with HP-IDM extension disabled"
echo " Note: by default, run tests will run all suites"
echo " -V, --virtual-env Always use virtualenv. Install automatically if not present"
echo " -N, --no-virtual-env Don't use virtualenv. Run tests in local environment"