v3 token API

Also implemented the following:

blueprint pluggable-identity-authentication-handlers
blueprint stop-ids-in-uris
blueprint multi-factor-authn (just the plumbing)

What's missing?

* domain scoping (will be implemented by Henry?)

Change-Id: I191c0b2cb3367b2a5f8a2dc674c284bb13ea97e3
This commit is contained in:
Guang Yee 2013-01-08 08:46:20 -08:00
parent d036db145d
commit 9f812939d4
34 changed files with 1833 additions and 134 deletions

View File

@ -75,6 +75,7 @@ values are organized into the following sections:
* ``[policy]`` - policy system driver configuration for RBAC
* ``[signing]`` - cryptographic signatures for PKI based tokens
* ``[ssl]`` - SSL configuration
* ``[auth]`` - Authentication plugin configuration
The Keystone configuration file is expected to be named ``keystone.conf``.
When starting keystone, you can specify a different configuration file to
@ -88,6 +89,59 @@ order:
* ``/etc/``
Authentication Plugins
----------------------
Keystone supports authentication plugins and they are specified
in the ``[auth]`` section of the configuration file. However, an
authentication plugin may also have its own section in the configuration
file. It is up to the plugin to register its own configuration options.
* ``methods`` - comma-delimited list of authentication plugin names
* ``<plugin name>`` - specify the class which handles to authentication method, in the same manner as one would specify a backend driver.
Keystone provides two authentication methods by default. ``password`` handles password authentication and ``token`` handles token authentication.
How to Implement an Authentication Plugin
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
All authentication plugins must extend the
``keystone.auth.core.AuthMethodHandler`` class and implement the
``authenticate()`` method. The ``authenticate()`` method expects the
following parameters.
* ``context`` - Keystone's request context
* ``auth_payload`` - the content of the authentication for a given method
* ``auth_context`` - user authentication context, a dictionary shared by all plugins. It contains ``method_names`` and ``extras`` by default. ``method_names`` is a list and ``extras`` is a dictionary.
If successful, the ``authenticate()`` method must provide a valid ``user_id``
in ``auth_context`` and return ``None``. ``method_name`` is used to convey
any additional authentication methods in case authentication is for re-scoping.
For example, if the authentication is for re-scoping, a plugin must append
the previous method names into ``method_names``. Also, a plugin may add any
additional information into ``extras``. Anything in ``extras`` will be
conveyed in the token's ``extras`` field.
If authentication requires multiple steps, the ``authenticate()`` method must
return the payload in the form of a dictionary for the next authentication
step.
If authentication is unsuccessful, the ``authenticate()`` method must raise a
``keystone.exception.Unauthorized`` exception.
Simply add the new plugin name to the ``methods`` list along with your plugin
class configuration in the ``[auth]`` sections of the configuration file
to deploy it.
If the plugin require addition configurations, it may register its own section
in the configuration file.
Plugins are invoked in the order in which they are specified in the ``methods``
attribute of the ``authentication`` request body. If multiple plugins are
invoked, all plugins must succeed in order to for the entire
authentication to be successful. Furthermore, all the plugins invoked must
agree on the ``user_id`` in the ``auth_context``.
Certificates for PKI
--------------------

View File

@ -193,6 +193,11 @@
# group_allow_update = True
# group_allow_delete = True
[auth]
methods = password,token
password = keystone.auth.methods.password.Password
token = keystone.auth.methods.token.Token
[filter:debug]
paste.filter_factory = keystone.common.wsgi:Debug.factory

View File

@ -32,6 +32,16 @@
"identity:update_user": [["rule:admin_required"]],
"identity:delete_user": [["rule:admin_required"]],
"identity:get_group": [["rule:admin_required"]],
"identity:list_groups": [["rule:admin_required"]],
"identity:create_group": [["rule:admin_required"]],
"identity:update_group": [["rule:admin_required"]],
"identity:delete_group": [["rule:admin_required"]],
"identity:list_users_in_group": [["rule:admin_required"]],
"identity:remove_user_from_group": [["rule:admin_required"]],
"identity:check_user_in_group": [["rule:admin_required"]],
"identity:add_user_to_group": [["rule:admin_required"]],
"identity:get_credential": [["rule:admin_required"]],
"identity:list_credentials": [["rule:admin_required"]],
"identity:create_credential": [["rule:admin_required"]],
@ -41,8 +51,8 @@
"identity:get_role": [["rule:admin_required"]],
"identity:list_roles": [["rule:admin_required"]],
"identity:create_role": [["rule:admin_required"]],
"identity:update_roles": [["rule:admin_required"]],
"identity:delete_roles": [["rule:admin_required"]],
"identity:update_role": [["rule:admin_required"]],
"identity:delete_role": [["rule:admin_required"]],
"identity:check_grant": [["rule:admin_required"]],
"identity:list_grants": [["rule:admin_required"]],
@ -53,5 +63,10 @@
"identity:list_policies": [["rule:admin_required"]],
"identity:create_policy": [["rule:admin_required"]],
"identity:update_policy": [["rule:admin_required"]],
"identity:delete_policy": [["rule:admin_required"]]
"identity:delete_policy": [["rule:admin_required"]],
"identity:check_token": [["rule:admin_required"]],
"identity:validate_token": [["rule:admin_required"]],
"identity:revocation_list": [["rule:admin_required"]],
"identity:revoke_token": [["rule:admin_required"], ["user_id:%(user_id)s"]]
}

19
keystone/auth/__init__.py Normal file
View File

@ -0,0 +1,19 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.auth.core import AuthMethodHandler
from keystone.auth import controllers
from keystone.auth import routers

View File

@ -0,0 +1,388 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from keystone.auth import token_factory
from keystone.common import controller
from keystone.common import cms
from keystone.common import logging
from keystone import config
from keystone import exception
from keystone import identity
from keystone import token
from keystone.openstack.common import importutils
LOG = logging.getLogger(__name__)
CONF = config.CONF
# registry of authentication methods
AUTH_METHODS = {}
# register method drivers
for method_name in CONF.auth.methods:
try:
config.register_str(method_name, group='auth')
except Exception as e:
# don't care about duplicate error
LOG.warn(e)
def load_auth_method(method_name):
if method_name not in CONF.auth.methods:
raise exception.AuthMethodNotSupported()
driver = CONF.auth.get(method_name)
return importutils.import_object(driver)
def get_auth_method(method_name):
global AUTH_METHODS
if method_name not in AUTH_METHODS:
AUTH_METHODS[method_name] = load_auth_method(method_name)
return AUTH_METHODS[method_name]
class AuthInfo(object):
""" Encapsulation of "auth" request. """
def __init__(self, context, auth=None):
self.identity_api = identity.Manager()
self.context = context
self.auth = auth
self._scope_data = (None, None)
# self._scope_data is (domain_id, project_id)
# project scope: (None, project_id)
# domain scope: (domain_id, None)
# unscoped: (None, None)
self._validate_and_normalize_auth_data()
def _assert_project_is_enabled(self, project_ref):
# ensure the project is enabled
if not project_ref.get('enabled', True):
msg = _('Project is disabled: %s') % project_ref['id']
LOG.warning(msg)
raise exception.Unauthorized(msg)
def _assert_domain_is_enabled(self, domain_ref):
if not domain_ref.get('enabled'):
msg = _('Domain is disabled: %s') % (domain_ref['id'])
LOG.warning(msg)
raise exception.Unauthorized(msg)
def _assert_user_is_enabled(self, user_ref):
if not user_ref.get('enabled', True):
msg = _('User is disabled: %s') % (user_ref['id'])
LOG.warning(msg)
raise exception.Unauthorized(msg)
def _lookup_domain(self, domain_info):
domain_id = domain_info.get('id')
domain_name = domain_info.get('name')
domain_ref = None
if not domain_id and not domain_name:
raise exception.ValidationError(attribute='id or name',
target='domain')
try:
if domain_name:
domain_ref = self.identity_api.get_domain_by_name(
context=self.context, domain_name=domain_name)
else:
domain_ref = self.identity_api.get_domain(
context=self.context, domain_id=domain_id)
except exception.DomainNotFound as e:
LOG.exception(e)
raise exception.Unauthorized(e)
self._assert_domain_is_enabled(domain_ref)
return domain_ref
def _lookup_project(self, project_info):
project_id = project_info.get('id')
project_name = project_info.get('name')
project_ref = None
if not project_id and not project_name:
raise exception.ValidationError(attribute='id or name',
target='project')
try:
if project_name:
if 'domain' not in project_info:
raise exception.ValidationError(attribute='domain',
target='project')
domain_ref = self._lookup_domain(project_info['domain'])
project_ref = self.identity_api.get_project_by_name(
context=self.context, tenant_name=project_name,
domain_id=domain_ref['id'])
else:
project_ref = self.identity_api.get_project(
context=self.context, tenant_id=project_id)
except exception.ProjectNotFound as e:
LOG.exception(e)
raise exception.Unauthorized(e)
self._assert_project_is_enabled(project_ref)
return project_ref
def lookup_user(self, user_info):
user_id = user_info.get('id')
user_name = user_info.get('name')
user_ref = None
if not user_id and not user_name:
raise exception.ValidationError(attribute='id or name',
target='user')
try:
if user_name:
if 'domain' not in user_info:
raise exception.ValidationError(attribute='domain',
target='user')
domain_ref = self._lookup_domain(user_info['domain'])
user_ref = self.identity_api.get_user_by_name(
context=self.context, user_name=user_name,
domain_id=domain_ref['id'])
else:
user_ref = self.identity_api.get_user(
context=self.context, user_id=user_id)
except exception.UserNotFound as e:
LOG.exception(e)
raise exception.Unauthorized(e)
self._assert_user_is_enabled(user_ref)
return user_ref
def _validate_and_normalize_scope_data(self):
""" Validate and normalize scope data """
if 'scope' not in self.auth:
return
# if scoped, only to a project or domain, but not both
if ('project' not in self.auth['scope'] and
'domain' not in self.auth['scope']):
# neither domain or project provided
raise exception.ValidationError(attribute='project or domain',
target='scope')
if ('project' in self.auth['scope'] and
'domain' in self.auth['scope']):
# both domain and project provided
raise exception.ValidationError(attribute='project or domain',
target='scope')
if 'project' in self.auth['scope']:
project_ref = self._lookup_project(self.auth['scope']['project'])
self._scope_data = (None, project_ref['id'])
else:
domain_ref = self._lookup_domain(self.auth['scope']['domain'])
self._scope_data = (domain_ref['id'], None)
def _validate_auth_methods(self):
# make sure auth methods are provided
if 'methods' not in self.auth['authentication']:
raise exception.ValidationError(attribute='methods',
target='authentication')
# make sure all the method data/payload are provided
for method_name in self.get_method_names():
if method_name not in self.auth['authentication']:
raise exception.ValidationError(attribute=method_name,
target='authentication')
# make sure auth method is supported
for method_name in self.get_method_names():
if method_name not in CONF.auth.methods:
raise exception.AuthMethodNotSupported()
def _validate_and_normalize_auth_data(self):
""" Make sure "auth" is valid. """
# make sure "auth" exist
if not self.auth:
raise exception.ValidationError(attribute='auth',
target='request body')
self._validate_auth_methods()
self._validate_and_normalize_scope_data()
def get_method_names(self):
""" Returns the authentication method names.
:returns: list of auth method names
"""
return self.auth['authentication']['methods']
def get_method_data(self, method):
""" Get the auth method payload.
:returns: auth method payload
"""
if method not in self.auth['authentication']['methods']:
raise exception.ValidationError(attribute=method_name,
target='authentication')
return self.auth['authentication'][method]
def get_scope(self):
""" Get scope information.
Verify and return the scoping information.
:returns: (domain_id, project_id). If scope to a project,
(None, project_id) will be returned. If scope to a domain,
(domain_id, None) will be returned. If unscope,
(None, None) will be returned.
"""
return self._scope_data
def set_scope(self, domain_id=None, project_id=None):
""" Set scope information. """
if domain_id and project_id:
msg = _('Scoping to both domain and project is not allowed')
raise ValueError(msg)
self._scope_data = (domain_id, project_id)
class Auth(controller.V3Controller):
def __init__(self, *args, **kw):
super(Auth, self).__init__(*args, **kw)
self.token_controllers_ref = token.controllers.Auth()
def authenticate_for_token(self, context, authentication, scope=None):
""" Authenticate user and issue a token. """
try:
auth = None
auth = {'authentication': authentication}
if scope:
auth['scope'] = scope
auth_info = AuthInfo(context, auth=auth)
auth_context = {'extras': {}, 'method_names': []}
self.authenticate(context, auth_info, auth_context)
self._check_and_set_default_scoping(context, auth_info,
auth_context)
(token_id, token_data) = token_factory.create_token(
context, auth_context, auth_info)
return token_factory.render_token_data_response(token_id,
token_data)
except (exception.Unauthorized,
exception.AuthMethodNotSupported,
exception.AdditionalAuthRequired) as e:
raise e
except Exception as e:
LOG.exception(e)
raise exception.Unauthorized(e)
def _check_and_set_default_scoping(self, context, auth_info, auth_context):
(domain_id, project_id) = auth_info.get_scope()
if domain_id or project_id:
# scope is specified
return
# fill in default_project_id if it is available
try:
user_ref = self.identity_api.get_user(
context=context, user_id=auth_context['user_id'])
default_project_id = user_ref.get('default_project_id')
if default_project_id:
auth_info.set_scope(domain_id=None,
project_id=default_project_id)
except exception.UserNotFound as e:
LOG.exception(e)
raise exception.Unauthorized(e)
def _build_remote_user_auth_context(self, context, auth_info,
auth_context):
username = context['REMOTE_USER']
# FIXME(gyee): REMOTE_USER is not good enough since we are
# requiring domain_id to do user lookup now. Try to get
# the user_id from auth_info for now, assuming external auth
# has check to make sure user is the same as the one specify
# in "authentication".
if 'password' in auth_info.get_method_names():
user_info = auth_info.get_method_data('password')
user_ref = auth_info.lookup_user(user_info['user'])
auth_context['user_id'] = user_ref['id']
else:
msg = _('Unable to lookup user %s') % (username)
raise exception.Unauthorized(msg)
def authenticate(self, context, auth_info, auth_context):
""" Authenticate user. """
# user have been authenticated externally
if 'REMOTE_USER' in context:
self._build_remote_user_auth_context(context,
auth_info,
auth_context)
return
# need to aggregate the results in case two or more methods
# are specified
auth_response = {'methods': []}
for method_name in auth_info.get_method_names():
method = get_auth_method(method_name)
resp = method.authenticate(context,
auth_info.get_method_data(method_name),
auth_context)
if resp:
auth_response['methods'].append(method_name)
auth_response[method_name] = resp
if len(auth_response["methods"]) > 0:
# authentication continuation required
raise exception.AdditionalAuthRequired(auth_response)
if 'user_id' not in auth_context:
msg = _('User not found')
raise exception.Unauthorized(msg)
def _get_token_ref(self, context, token_id, belongs_to=None):
token_ref = self.token_api.get_token(context=context,
token_id=token_id)
if cms.is_ans1_token(token_id):
verified_token = cms.cms_verify(cms.token_to_cms(token_id),
CONF.signing.certfile,
CONF.signing.ca_certs)
token_ref = json.loads(verified_token)
if belongs_to:
assert token_ref['project']['id'] == belongs_to
return token_ref
@controller.protected
def check_token(self, context):
try:
token_id = context.get('subject_token_id')
belongs_to = context['query_string'].get('belongsTo')
assert self._get_token_ref(context, token_id, belongs_to)
except Exception as e:
LOG.error(e)
raise exception.Unauthorized(e)
@controller.protected
def revoke_token(self, context):
token_id = context.get('subject_token_id')
return self.token_controllers_ref.delete_token(context, token_id)
@controller.protected
def validate_token(self, context):
token_id = context.get('subject_token_id')
self.check_token(context)
token_ref = self.token_api.get_token(context, token_id)
return token_factory.recreate_token_data(context,
token_ref.get('token_data'),
token_ref['expires'],
token_ref.get('user'),
token_ref.get('tenant'))
@controller.protected
def revocation_list(self, context, auth=None):
return self.token_controllers_ref.revocation_list(context, auth)

83
keystone/auth/core.py Normal file
View File

@ -0,0 +1,83 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone import exception
from keystone.common import dependency
@dependency.requires('identity_api')
class AuthMethodHandler(object):
"""Abstract base class for an authentication plugin."""
def __init__(self):
pass
def authenticate(self, context, auth_payload, auth_context):
"""Authenticate user and return an authentication context.
:param context: keystone's request context
:auth_payload: the content of the authentication for a given method
:auth_context: user authentication context, a dictionary shared
by all plugins. It contains "method_names" and "extras"
by default. "method_names" is a list and "extras" is
a dictionary.
If successful, plugin must set "user_id" in "auth_context".
"method_name" is used to convey any additional authentication methods
in case authentication is for re-scoping. For example,
if the authentication is for re-scoping, plugin must append the
previous method names into "method_names". Also, plugin may add
any additional information into "extras". Anything in "extras"
will be conveyed in the token's "extras" field. Here's an example of
"auth_context" on successful authentication.
{"user_id": "abc123",
"methods": ["password", "token"],
"extras": {}}
Plugins are invoked in the order in which they are specified in the
"methods" attribute of the "authentication" request body.
For example, with the following authentication request,
{"authentication": {
"methods": ["custom-plugin", "password", "token"],
"token": {
"id": "sdfafasdfsfasfasdfds"
},
"custom-plugin": {
"custom-data": "sdfdfsfsfsdfsf"
},
"password": {
"user": {
"id": "s23sfad1",
"password": "secrete"
}
}
}}
plugins will be invoked in this order:
1. custom-plugin
2. password
3. token
:returns: None if authentication is successful.
Authentication payload in the form of a dictionary for the
next authentication step if this is a multi step
authentication.
:raises: exception.Unauthorized for authentication failure
"""
raise exception.Unauthorized()

View File

View File

@ -0,0 +1,114 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.common import logging
from keystone import auth
from keystone import exception
from keystone import identity
METHOD_NAME = 'password'
LOG = logging.getLogger(__name__)
class UserAuthInfo(object):
def __init__(self, context, auth_payload):
self.identity_api = identity.Manager()
self.context = context
self.user_id = None
self.password = None
self.user_ref = None
self._validate_and_normalize_auth_data(auth_payload)
def _assert_domain_is_enabled(self, domain_ref):
if not domain_ref.get('enabled'):
msg = _('Domain is disabled: %s') % (domain_ref['id'])
LOG.warning(msg)
raise exception.Unauthorized(msg)
def _assert_user_is_enabled(self, user_ref):
if not user_ref.get('enabled', True):
msg = _('User is disabled: %s') % (user_ref['id'])
LOG.warning(msg)
raise exception.Unauthorized(msg)
def _lookup_domain(self, domain_info):
domain_id = domain_info.get('id')
domain_name = domain_info.get('name')
domain_ref = None
if not domain_id and not domain_name:
raise exception.ValidationError(attribute='id or name',
target='domain')
try:
if domain_name:
domain_ref = self.identity_api.get_domain_by_name(
context=self.context, domain_name=domain_name)
else:
domain_ref = self.identity_api.get_domain(
context=self.context, domain_id=domain_id)
except exception.DomainNotFound as e:
LOG.exception(e)
raise exception.Unauthorized(e)
self._assert_domain_is_enabled(domain_ref)
return domain_ref
def _validate_and_normalize_auth_data(self, auth_payload):
if 'user' not in auth_payload:
raise exception.ValidationError(attribute='user',
target=METHOD_NAME)
user_info = auth_payload['user']
user_id = user_info.get('id')
user_name = user_info.get('name')
user_ref = None
if not user_id and not user_name:
raise exception.ValidationError(attribute='id or name',
target='user')
self.password = user_info.get('password', None)
try:
if user_name:
if 'domain' not in user_info:
raise exception.ValidationError(attribute='domain',
target='user')
domain_ref = self._lookup_domain(user_info['domain'])
user_ref = self.identity_api.get_user_by_name(
context=self.context, user_name=user_name,
domain_id=domain_ref['id'])
else:
user_ref = self.identity_api.get_user(
context=self.context, user_id=user_id)
except exception.UserNotFound as e:
LOG.exception(e)
raise exception.Unauthorized(e)
self._assert_user_is_enabled(user_ref)
self.user_ref = user_ref
self.user_id = user_ref['id']
class Password(auth.AuthMethodHandler):
def authenticate(self, context, auth_payload, user_context):
""" Try to authenticate against the identity backend. """
user_info = UserAuthInfo(context, auth_payload)
# FIXME: identity.authenticate() can use some refactoring since
# all we care is password matches
user_auth_data = self.identity_api.authenticate(
context=context,
user_id=user_info.user_id,
password=user_info.password)
if 'user_id' not in user_context:
user_context['user_id'] = user_info.user_id

View File

@ -0,0 +1,49 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.common import dependency
from keystone.common import logging
from keystone import auth
from keystone import exception
from keystone import token
METHOD_NAME = 'token'
LOG = logging.getLogger(__name__)
class Token(auth.AuthMethodHandler):
def __init__(self):
self.token_api = token.Manager()
def authenticate(self, context, auth_payload, user_context):
try:
if 'id' not in auth_payload:
raise exception.ValidationError(attribute='id',
target=METHOD_NAME)
token_id = auth_payload['id']
token_ref = self.token_api.get_token(context, token_id)
user_context.setdefault('user_id',
token_ref['token_data']['user']['id'])
user_context.setdefault('expires',
token_ref['expires'])
user_context['extras'].update(token_ref['token_data']['extras'])
user_context['method_names'] += token_ref['token_data']['methods']
except AssertionError as e:
LOG.error(e)
raise exception.Unauthorized(e)

43
keystone/auth/routers.py Normal file
View File

@ -0,0 +1,43 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.auth import controllers
from keystone.common import router
def append_v3_routers(mapper, routers):
auth_controller = controllers.Auth()
mapper.connect('/auth/tokens',
controller=auth_controller,
action='authenticate_for_token',
conditions=dict(method=['POST']))
mapper.connect('/auth/tokens',
controller=auth_controller,
action='check_token',
conditions=dict(method=['HEAD']))
mapper.connect('/auth/tokens',
controller=auth_controller,
action='revoke_token',
conditions=dict(method=['DELETE']))
mapper.connect('/auth/tokens',
controller=auth_controller,
action='validate_token',
conditions=dict(method=['GET']))
mapper.connect('/auth/tokens/OS-PKI/revoked',
controller=auth_controller,
action='revocation_list',
conditions=dict(method=['GET']))

View File

@ -0,0 +1,238 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Token Factory"""
import json
import uuid
import webob
from keystone.common import cms
from keystone.common import logging
from keystone.common import utils
from keystone import catalog
from keystone import config
from keystone import exception
from keystone import identity
from keystone import token as token_module
from keystone.openstack.common import jsonutils
from keystone.openstack.common import timeutils
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TokenDataHelper(object):
"""Token data helper."""
def __init__(self, context):
self.identity_api = identity.Manager()
self.catalog_api = catalog.Manager()
self.context = context
def _get_filtered_domain(self, domain_id):
domain_ref = self.identity_api.get_domain(self.context,
domain_id)
return {'id': domain_ref['id'], 'name': domain_ref['name']}
def _populate_scope(self, token_data, domain_id, project_id):
if domain_id:
token_data['domain'] = self._get_filtered_domain(domain_id)
if project_id:
project_ref = self.identity_api.get_project(
self.context, project_id)
filtered_project = {
'id': project_ref['id'],
'name': project_ref['name']}
filtered_project['domain'] = self._get_filtered_domain(
project_ref['domain_id'])
token_data['project'] = filtered_project
def _get_project_roles_for_user(self, user_id, project_id):
roles = self.identity_api.get_roles_for_user_and_project(
self.context, user_id, project_id)
roles_ref = []
for role_id in roles:
role_ref = self.identity_api.get_role(self.context, role_id)
role_ref.setdefault('project_id', project_id)
roles_ref.append(role_ref)
# user have no project roles, therefore access denied
if len(roles_ref) == 0:
msg = _('User have no access to project')
LOG.debug(msg)
raise exception.Unauthorized(msg)
return roles_ref
def _get_roles_for_user(self, user_id, domain_id, project_id):
roles = []
if domain_id:
# TODO(gyee): get domain roles
pass
if project_id:
roles = self._get_project_roles_for_user(user_id, project_id)
return roles
def _populate_user(self, token_data, user_id, domain_id, project_id):
user_ref = self.identity_api.get_user(self.context,
user_id)
filtered_user = {
'id': user_ref['id'],
'name': user_ref['name'],
'domain': self._get_filtered_domain(user_ref['domain_id'])}
token_data['user'] = filtered_user
def _populate_roles(self, token_data, user_id, domain_id, project_id):
if domain_id or project_id:
roles = self._get_roles_for_user(user_id, domain_id, project_id)
# we only care about id and name
filtered_roles = []
for role in roles:
filtered_roles.append({'id': role['id'], 'name': role['name']})
token_data['roles'] = filtered_roles
def _populate_service_catalog(self, token_data, user_id,
domain_id, project_id):
service_catalog = self.catalog_api.get_v3_catalog(self.context,
user_id,
project_id)
# TODO(gyee): v3 service catalog is not quite completed yet
token_data['catalog'] = service_catalog
def _populate_token(self, token_data, expires=None):
if not expires:
expires = token_module.default_expire_time()
if not isinstance(expires, unicode):
expires = timeutils.isotime(expires)
token_data['expires'] = expires
token_data['issued_at'] = timeutils.strtime()
def get_token_data(self, user_id, method_names, extras,
domain_id=None, project_id=None, expires=None):
token_data = {'methods': method_names,
'extras': extras}
self._populate_scope(token_data, domain_id, project_id)
self._populate_user(token_data, user_id, domain_id, project_id)
self._populate_roles(token_data, user_id, domain_id, project_id)
self._populate_service_catalog(token_data, user_id, domain_id,
project_id)
self._populate_token(token_data, expires)
return token_data
def recreate_token_data(context, token_data=None, expires=None,
user_ref=None, project_ref=None):
""" Recreate token from an existing token.
Repopulate the ephemeral data and return the new token data.
"""
new_expires = expires
project_id = None
user_id = None
domain_id = None
methods = ['password', 'token']
extras = {}
if token_data:
domain_id = (token_data['domain']['id'] if 'domain' in token_data
else None)
project_id = (token_data['project']['id'] if 'project' in token_data
else None)
if not new_expires:
new_expires = token_data['expires']
user_id = token_data['user']['id']
methods = token_data['methods']
extras = token_data['extras']
else:
project_id = project_ref['id']
user_id = user_ref['id']
token_data_helper = TokenDataHelper(context)
return token_data_helper.get_token_data(user_id,
methods,
extras,
domain_id,
project_id,
new_expires)
def create_token(context, auth_context, auth_info):
token_data_helper = TokenDataHelper(context)
(domain_id, project_id) = auth_info.get_scope()
method_names = list(set(auth_info.get_method_names() +
auth_context.get('method_names', [])))
token_data = token_data_helper.get_token_data(auth_context['user_id'],
method_names,
auth_context['extras'],
domain_id,
project_id,
auth_context.get('expires',
None))
if CONF.signing.token_format == 'UUID':
token_id = uuid.uuid4().hex
elif CONF.signing.token_format == 'PKI':
token_id = cms.cms_sign_token(json.dumps(token_data),
CONF.signing.certfile,
CONF.signing.keyfile)
else:
raise exception.UnexpectedError(
'Invalid value for token_format: %s.'
' Allowed values are PKI or UUID.' %
CONF.signing.token_format)
token_api = token_module.Manager()
try:
expiry = token_data['expires']
if isinstance(expiry, basestring):
expiry = timeutils.parse_isotime(expiry)
role_ids = []
if 'project' in token_data:
# project-scoped token, fill in the v2 token data
# all we care are the role IDs
role_ids = [role['id'] for role in token_data['roles']]
metadata_ref = {'roles': role_ids}
data = dict(key=token_id,
id=token_id,
expires=expiry,
user=token_data['user'],
tenant=token_data.get('project'),
metadata=metadata_ref,
token_data=token_data)
token_api.create_token(context, token_id, data)
except Exception as e:
# an identical token may have been created already.
# if so, return the token_data as it is also identical
try:
token_api.get_token(context=context,
token_id=token_id)
except exception.TokenNotFound:
raise e
return (token_id, token_data)
def render_token_data_response(token_id, token_data):
""" Render token data HTTP response.
Stash token ID into the X-Auth-Token header.
"""
headers = [('X-Subject-Token', token_id)]
headers.append(('Vary', 'X-Auth-Token'))
headers.append(('Content-Type', 'application/json'))
status = (200, 'OK')
body = jsonutils.dumps(token_data, cls=utils.SmarterEncoder)
return webob.Response(body=body,
status='%s %s' % status,
headerlist=headers)

View File

@ -172,3 +172,33 @@ class Catalog(sql.Base, catalog.Driver):
catalog[endpoint['region']][service['type']][interface_url] = url
return catalog
def get_v3_catalog(self, user_id, tenant_id, metadata=None):
d = dict(CONF.iteritems())
d.update({'tenant_id': tenant_id,
'user_id': user_id})
services = {}
for endpoint in self.list_endpoints():
# look up the service
service_id = endpoint['service_id']
services.setdefault(
service_id,
self.get_service(service_id))
service = services[service_id]
del endpoint['service_id']
endpoint['url'] = core.format_url(endpoint['url'], d)
if 'endpoints' in services[service_id]:
services[service_id]['endpoints'].append(endpoint)
else:
services[service_id]['endpoints'] = [endpoint]
catalog = []
for service_id, service in services.iteritems():
formatted_service = {}
formatted_service['id'] = service['id']
formatted_service['type'] = service['type']
formatted_service['endpoints'] = service['endpoints']
catalog.append(formatted_service)
return catalog

View File

@ -213,3 +213,33 @@ class Driver(object):
"""
raise exception.NotImplemented()
def get_v3_catalog(self, user_id, tenant_id, metadata=None):
"""Retrieve and format the current V3 service catalog.
Example::
[
{
"endpoints": [
{
"interface": "public",
"id": "--endpoint-id--",
"region": "RegionOne",
"url": "http://external:8776/v1/--project-id--"
},
{
"interface": "internal",
"id": "--endpoint-id--",
"region": "RegionOne",
"url": "http://internal:8776/v1/--project-id--"
}],
"id": "--service-id--",
"type": "volume"
}]
:returns: A list representing the service catalog or an empty list
:raises: keystone.exception.NotFound
"""
raise exception.NotImplemented()

View File

@ -32,7 +32,7 @@ def protected(f):
LOG.warning(_('RBAC: Invalid token'))
raise exception.Unauthorized()
creds = token_ref['metadata'].copy()
creds = token_ref.get('metadata', {}).copy()
try:
creds['user_id'] = token_ref['user'].get('id')

View File

@ -547,10 +547,11 @@ def render_response(body=None, status=None, headers=None):
def render_exception(error):
"""Forms a WSGI response based on the current error."""
return render_response(status=(error.code, error.title), body={
'error': {
'code': error.code,
'title': error.title,
'message': str(error),
}
})
body = {'error': {
'code': error.code,
'title': error.title,
'message': str(error)
}}
if isinstance(error, exception.AuthPluginException):
body['authentication'] = error.authentication
return render_response(status=(error.code, error.title), body=body)

View File

@ -310,3 +310,11 @@ register_bool('group_allow_delete', group='ldap', default=True)
register_str('url', group='pam', default=None)
register_str('userid', group='pam', default=None)
register_str('password', group='pam', default=None)
# default authentication methods
register_list('methods', group='auth',
default=['password', 'token'])
register_str('password', group='auth',
default='keystone.auth.methods.token.Token')
register_str('token', group='auth',
default='keystone.auth.methods.password.Password')

View File

@ -114,6 +114,24 @@ class Unauthorized(SecurityError):
title = 'Not Authorized'
class AuthPluginException(Unauthorized):
""" Authentication plugin error. """
authentication = {}
class AuthMethodNotSupported(AuthPluginException):
""" Attempted to authenticate with an unsupported method. """
authentication = {'methods': CONF.auth.methods}
class AdditionalAuthRequired(AuthPluginException):
""" Additional authentications steps required. """
def __init__(self, auth_response=None, **kwargs):
super(AdditionalAuthRequired, self).__init__(message=None, **kwargs)
self.authentication = auth_response
class Forbidden(SecurityError):
"""You are not authorized to perform the requested action."""
code = 403

View File

@ -205,8 +205,20 @@ class Identity(sql.Base, identity.Driver):
raise AssertionError('Invalid user / password')
if tenant_id is not None:
# FIXME(gyee): this should really be
# get_roles_for_user_and_project() after the dusts settle
if tenant_id not in self.get_projects_for_user(user_id):
raise AssertionError('Invalid tenant')
# get_roles_for_user_and_project() returns a set
roles = []
try:
roles = self.get_roles_for_user_and_project(user_id,
tenant_id)
except:
# FIXME(gyee): we should never get into this situation
# after user project role migration is completed
pass
if not roles:
raise AssertionError('Invalid tenant')
try:
tenant_ref = self.get_project(tenant_id)
@ -387,14 +399,30 @@ class Identity(sql.Base, identity.Driver):
membership_refs = query.all()
return [x.project_id for x in membership_refs]
def _get_user_group_project_roles(self, metadata_ref, user_id, project_id):
group_refs = self.list_groups_for_user(user_id=user_id)
for x in group_refs:
try:
metadata_ref.update(
self.get_metadata(group_id=x['id'],
tenant_id=project_id))
except exception.MetadataNotFound:
# no group grant, skip
pass
def _get_user_project_roles(self, metadata_ref, user_id, project_id):
try:
metadata_ref.update(self.get_metadata(user_id, project_id))
except exception.MetadataNotFound:
pass
def get_roles_for_user_and_project(self, user_id, tenant_id):
self.get_user(user_id)
self.get_project(tenant_id)
try:
metadata_ref = self.get_metadata(user_id, tenant_id)
except exception.MetadataNotFound:
metadata_ref = {}
return metadata_ref.get('roles', [])
metadata_ref = {}
self._get_user_project_roles(metadata_ref, user_id, tenant_id)
self._get_user_group_project_roles(metadata_ref, user_id, tenant_id)
return list(set(metadata_ref.get('roles', [])))
def add_role_to_user_and_project(self, user_id, tenant_id, role_id):
self.get_user(user_id)

View File

@ -553,7 +553,7 @@ class UserV3(controller.V3Controller):
# revoke all tokens owned by this user
self.token_api.revoke_tokens(
context,
user_id=user['id'])
user_id=ref['id'])
return UserV3.wrap_member(context, ref)

View File

@ -31,6 +31,10 @@ CONF = config.CONF
AUTH_TOKEN_HEADER = 'X-Auth-Token'
# Header used to transmit the subject token
SUBJECT_TOKEN_HEADER = 'X-Subject-Token'
# Environment variable used to pass the request context
CONTEXT_ENV = wsgi.CONTEXT_ENV
@ -44,6 +48,9 @@ class TokenAuthMiddleware(wsgi.Middleware):
token = request.headers.get(AUTH_TOKEN_HEADER)
context = request.environ.get(CONTEXT_ENV, {})
context['token_id'] = token
if SUBJECT_TOKEN_HEADER in request.headers:
context['subject_token_id'] = (
request.headers.get(SUBJECT_TOKEN_HEADER))
request.environ[CONTEXT_ENV] = context

View File

@ -16,6 +16,7 @@
import routes
from keystone import auth
from keystone import catalog
from keystone.common import logging
from keystone.common import wsgi
@ -80,7 +81,7 @@ def v3_app_factory(global_conf, **local_conf):
conf.update(local_conf)
mapper = routes.Mapper()
v3routers = []
for module in [catalog, identity, policy]:
for module in [auth, catalog, identity, policy]:
module.routers.append_v3_routers(mapper, v3routers)
# TODO(ayoung): put token routes here
return wsgi.ComposingRouter(mapper, v3routers)

View File

@ -190,6 +190,7 @@ class TestCase(NoModule, unittest.TestCase):
self.config([etcdir('keystone.conf.sample'),
testsdir('test_overrides.conf')])
self.mox = mox.Mox()
self.opt(policy_file=etcdir('policy.json'))
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(exception, '_FATAL_EXCEPTION_FORMAT_ERRORS', True)

View File

@ -402,19 +402,8 @@ class Auth(controller.V2Controller):
"""
# TODO(termie): this stuff should probably be moved to middleware
self.assert_admin(context)
if cms.is_ans1_token(token_id):
data = json.loads(cms.cms_verify(cms.token_to_cms(token_id),
CONF.signing.certfile,
CONF.signing.ca_certs))
data['access']['token']['user'] = data['access']['user']
data['access']['token']['metadata'] = data['access']['metadata']
if belongs_to:
assert data['access']['token']['tenant']['id'] == belongs_to
token_ref = data['access']['token']
else:
token_ref = self.token_api.get_token(context=context,
token_id=token_id)
token_ref = self.token_api.get_token(context=context,
token_id=token_id)
return token_ref
# admin only

View File

@ -31,6 +31,8 @@ TENANTS = [
'id': 'bar',
'name': 'BAR',
'domain_id': DEFAULT_DOMAIN_ID,
'description': 'description',
'enabled': True,
}, {
'id': 'baz',
'name': 'BAZ',
@ -53,7 +55,9 @@ USERS = [
'name': 'FOO',
'domain_id': DEFAULT_DOMAIN_ID,
'password': 'foo2',
'tenants': ['bar']
'tenants': ['bar'],
'enabled': True,
'email': 'foo@bar.com',
}, {
'id': 'two',
'name': 'TWO',
@ -63,6 +67,7 @@ USERS = [
'enabled': True,
'tenant_id': 'baz',
'tenants': ['baz'],
'email': 'two@three.com',
}, {
'id': 'badguy',
'name': 'BadGuy',
@ -72,13 +77,15 @@ USERS = [
'enabled': False,
'tenant_id': 'baz',
'tenants': ['baz'],
'email': 'badguy@goodguy.com',
}, {
'id': 'sna',
'name': 'SNA',
'domain_id': DEFAULT_DOMAIN_ID,
'password': 'snafu',
'enabled': True,
'tenants': ['bar']
'tenants': ['bar'],
'email': 'sna@snl.coom',
}
]
@ -91,8 +98,8 @@ METADATA = [
ROLES = [
{
'id': 'keystone_admin',
'name': 'Keystone Admin',
'id': 'admin',
'name': 'admin',
}, {
'id': 'member',
'name': 'Member',

View File

@ -1,3 +0,0 @@
{
"admin_required": [["role:Keystadasd"], ["is_admin:1"]]
}

View File

@ -245,7 +245,7 @@ class AuthWithToken(AuthTest):
self.identity_api.create_grant(
group_id=new_group['id'],
project_id=self.tenant_bar['id'],
role_id=self.role_keystone_admin['id'])
role_id=self.role_admin['id'])
# Get a scoped token for the tenant
body_dict = _build_user_auth(
@ -259,7 +259,7 @@ class AuthWithToken(AuthTest):
roles = scoped_token["access"]["metadata"]["roles"]
self.assertEquals(tenant["id"], self.tenant_bar['id'])
self.assertIn(self.role_member['id'], roles)
self.assertIn(self.role_keystone_admin['id'], roles)
self.assertIn(self.role_admin['id'], roles)
def test_auth_token_cross_domain_group_and_project(self):
"""Verify getting a token in cross domain group/project roles"""
@ -291,7 +291,7 @@ class AuthWithToken(AuthTest):
self.identity_api.create_grant(
group_id=new_group['id'],
project_id=project1['id'],
role_id=self.role_keystone_admin['id'])
role_id=self.role_admin['id'])
self.identity_api.create_grant(
user_id=self.user_foo['id'],
domain_id=domain1['id'],
@ -312,7 +312,7 @@ class AuthWithToken(AuthTest):
roles = scoped_token["access"]["metadata"]["roles"]
self.assertEquals(tenant["id"], project1['id'])
self.assertIn(self.role_member['id'], roles)
self.assertIn(self.role_keystone_admin['id'], roles)
self.assertIn(self.role_admin['id'], roles)
self.assertNotIn(role_foo_domain1['id'], roles)
self.assertNotIn(role_group_domain1['id'], roles)

View File

@ -0,0 +1,3 @@
[auth]
methods = password,token,simple-challenge-response
simple-challenge-response = challenge_response_method.SimpleChallengeResponse

101
tests/test_auth_plugin.py Normal file
View File

@ -0,0 +1,101 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystone.common import logging
from keystone import auth
from keystone import config
from keystone import exception
from keystone import test
# for testing purposes only
METHOD_NAME = 'simple-challenge-response'
EXPECTED_RESPONSE = uuid.uuid4().hex
DEMO_USER_ID = uuid.uuid4().hex
class SimpleChallengeResponse(auth.AuthMethodHandler):
def authenticate(self, context, auth_payload, user_context):
if 'response' in auth_payload:
if auth_payload['response'] != EXPECTED_RESPONSE:
raise exception.Unauthorized('Wrong answer')
user_context['user_id'] = DEMO_USER_ID
else:
return {"challenge": "What's the name of your high school?"}
class TestAuthPlugin(test.TestCase):
def setUp(self):
super(TestAuthPlugin, self).setUp()
self.config([
test.etcdir('keystone.conf.sample'),
test.testsdir('test_overrides.conf'),
test.testsdir('backend_sql.conf'),
test.testsdir('backend_sql_disk.conf'),
test.testsdir('test_auth_plugin.conf')])
self.load_backends()
auth.controllers.AUTH_METHODS[METHOD_NAME] = SimpleChallengeResponse()
self.api = auth.controllers.Auth()
def test_unsupported_auth_method(self):
method_name = uuid.uuid4().hex
auth_data = {'methods': [method_name]}
auth_data[method_name] = {'test': 'test'}
auth_data = {'authentication': auth_data}
self.assertRaises(exception.AuthMethodNotSupported,
auth.controllers.AuthInfo,
None,
auth_data)
def test_addition_auth_steps(self):
auth_data = {'methods': ['simple-challenge-response']}
auth_data['simple-challenge-response'] = {
'test': 'test'}
auth_data = {'authentication': auth_data}
auth_info = auth.controllers.AuthInfo(None, auth_data)
auth_context = {'extras': {}, 'method_names': []}
try:
self.api.authenticate({}, auth_info, auth_context)
except exception.AdditionalAuthRequired as e:
self.assertTrue('methods' in e.authentication)
self.assertTrue(METHOD_NAME in e.authentication['methods'])
self.assertTrue(METHOD_NAME in e.authentication)
self.assertTrue('challenge' in e.authentication[METHOD_NAME])
# test correct response
auth_data = {'methods': ['simple-challenge-response']}
auth_data['simple-challenge-response'] = {
'response': EXPECTED_RESPONSE}
auth_data = {'authentication': auth_data}
auth_info = auth.controllers.AuthInfo(None, auth_data)
auth_context = {'extras': {}, 'method_names': []}
self.api.authenticate({}, auth_info, auth_context)
self.assertEqual(auth_context['user_id'], DEMO_USER_ID)
# test incorrect response
auth_data = {'methods': ['simple-challenge-response']}
auth_data['simple-challenge-response'] = {
'response': uuid.uuid4().hex}
auth_data = {'authentication': auth_data}
auth_info = auth.controllers.AuthInfo(None, auth_data)
auth_context = {'extras': {}, 'method_names': []}
self.assertRaises(exception.Unauthorized,
self.api.authenticate,
{},
auth_info,
auth_context)

View File

@ -101,13 +101,13 @@ class IdentityTests(object):
def test_authenticate_role_return(self):
self.identity_api.add_role_to_user_and_project(
self.user_foo['id'], self.tenant_baz['id'], 'keystone_admin')
self.user_foo['id'], self.tenant_baz['id'], self.role_admin['id'])
user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate(
user_id=self.user_foo['id'],
tenant_id=self.tenant_baz['id'],
password=self.user_foo['password'])
self.assertIn('roles', metadata_ref)
self.assertIn('keystone_admin', metadata_ref['roles'])
self.assertIn(self.role_admin['id'], metadata_ref['roles'])
def test_authenticate_no_metadata(self):
user = {
@ -223,9 +223,9 @@ class IdentityTests(object):
def test_get_role(self):
role_ref = self.identity_api.get_role(
role_id=self.role_keystone_admin['id'])
role_id=self.role_admin['id'])
role_ref_dict = dict((x, role_ref[x]) for x in role_ref)
self.assertDictEqual(role_ref_dict, self.role_keystone_admin)
self.assertDictEqual(role_ref_dict, self.role_admin)
def test_get_role_404(self):
self.assertRaises(exception.RoleNotFound,
@ -469,31 +469,31 @@ class IdentityTests(object):
def test_add_duplicate_role_grant(self):
roles_ref = self.identity_api.get_roles_for_user_and_project(
self.user_foo['id'], self.tenant_bar['id'])
self.assertNotIn('keystone_admin', roles_ref)
self.assertNotIn(self.role_admin['id'], roles_ref)
self.identity_api.add_role_to_user_and_project(
self.user_foo['id'], self.tenant_bar['id'], 'keystone_admin')
self.user_foo['id'], self.tenant_bar['id'], self.role_admin['id'])
self.assertRaises(exception.Conflict,
self.identity_api.add_role_to_user_and_project,
self.user_foo['id'],
self.tenant_bar['id'],
'keystone_admin')
self.role_admin['id'])
def test_get_role_by_user_and_project(self):
roles_ref = self.identity_api.get_roles_for_user_and_project(
self.user_foo['id'], self.tenant_bar['id'])
self.assertNotIn('keystone_admin', roles_ref)
self.assertNotIn(self.role_admin['id'], roles_ref)
self.identity_api.add_role_to_user_and_project(
self.user_foo['id'], self.tenant_bar['id'], 'keystone_admin')
self.user_foo['id'], self.tenant_bar['id'], self.role_admin['id'])
roles_ref = self.identity_api.get_roles_for_user_and_project(
self.user_foo['id'], self.tenant_bar['id'])
self.assertIn('keystone_admin', roles_ref)
self.assertIn(self.role_admin['id'], roles_ref)
self.assertNotIn('member', roles_ref)
self.identity_api.add_role_to_user_and_project(
self.user_foo['id'], self.tenant_bar['id'], 'member')
roles_ref = self.identity_api.get_roles_for_user_and_project(
self.user_foo['id'], self.tenant_bar['id'])
self.assertIn('keystone_admin', roles_ref)
self.assertIn(self.role_admin['id'], roles_ref)
self.assertIn('member', roles_ref)
def test_get_roles_for_user_and_project_404(self):
@ -512,13 +512,13 @@ class IdentityTests(object):
self.identity_api.add_role_to_user_and_project,
uuid.uuid4().hex,
self.tenant_bar['id'],
'keystone_admin')
self.role_admin['id'])
self.assertRaises(exception.ProjectNotFound,
self.identity_api.add_role_to_user_and_project,
self.user_foo['id'],
uuid.uuid4().hex,
'keystone_admin')
self.role_admin['id'])
self.assertRaises(exception.RoleNotFound,
self.identity_api.add_role_to_user_and_project,
@ -547,11 +547,12 @@ class IdentityTests(object):
self.assertEquals(len(roles_ref), 1)
self.identity_api.create_grant(user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'],
role_id='keystone_admin')
role_id=self.role_admin['id'])
roles_ref = self.identity_api.list_grants(
user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'])
self.assertDictEqual(roles_ref[1], self.role_keystone_admin)
self.assertIn(self.role_admin['id'],
[role_ref['id'] for role_ref in roles_ref])
self.identity_api.create_grant(user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'],
@ -563,7 +564,7 @@ class IdentityTests(object):
roles_ref_ids = []
for i, ref in enumerate(roles_ref):
roles_ref_ids.append(ref['id'])
self.assertIn('keystone_admin', roles_ref_ids)
self.assertIn(self.role_admin['id'], roles_ref_ids)
self.assertIn('member', roles_ref_ids)
def test_get_role_grants_for_user_and_project_404(self):
@ -582,13 +583,13 @@ class IdentityTests(object):
self.identity_api.create_grant,
user_id=uuid.uuid4().hex,
project_id=self.tenant_bar['id'],
role_id='keystone_admin')
role_id=self.role_admin['id'])
self.assertRaises(exception.ProjectNotFound,
self.identity_api.create_grant,
user_id=self.user_foo['id'],
project_id=uuid.uuid4().hex,
role_id='keystone_admin')
role_id=self.role_admin['id'])
self.assertRaises(exception.RoleNotFound,
self.identity_api.create_grant,
@ -730,13 +731,13 @@ class IdentityTests(object):
self.identity_api.create_grant(group_id=new_group2['id'],
domain_id=new_domain['id'],
role_id='keystone_admin')
role_id=self.role_admin['id'])
self.identity_api.create_grant(user_id=new_user2['id'],
domain_id=new_domain['id'],
role_id='keystone_admin')
role_id=self.role_admin['id'])
self.identity_api.create_grant(group_id=new_group['id'],
project_id=new_project['id'],
role_id='keystone_admin')
role_id=self.role_admin['id'])
roles_ref = self.identity_api.list_grants(
group_id=new_group['id'],

View File

@ -73,7 +73,7 @@ class RestfulTestCase(test.TestCase):
self.metadata_foobar = self.identity_api.update_metadata(
self.user_foo['id'],
self.tenant_bar['id'],
dict(roles=['keystone_admin'], is_admin='1'))
dict(roles=[self.role_admin['id']], is_admin='1'))
def tearDown(self):
"""Kill running servers and release references to avoid leaks."""
@ -180,7 +180,8 @@ class RestfulTestCase(test.TestCase):
elif self.content_type == 'xml':
response.body = etree.fromstring(response.body)
def restful_request(self, headers=None, body=None, token=None, **kwargs):
def restful_request(self, method='GET', headers=None, body=None,
token=None, **kwargs):
"""Serializes/deserializes json/xml as request/response body.
.. WARNING::
@ -198,12 +199,13 @@ class RestfulTestCase(test.TestCase):
body = self._to_content_type(body, headers)
# Perform the HTTP request/response
response = self.request(headers=headers, body=body, **kwargs)
response = self.request(method=method, headers=headers, body=body,
**kwargs)
self._from_content_type(response)
# we can save some code & improve coverage by always doing this
if response.status >= 400:
if method != 'HEAD' and response.status >= 400:
self.assertValidErrorResponse(response)
# Contains the decoded response.body

View File

@ -53,7 +53,7 @@ class CompatTestCase(test.TestCase):
# override the fixtures, for now
self.metadata_foobar = self.identity_api.update_metadata(
self.user_foo['id'], self.tenant_bar['id'],
dict(roles=['keystone_admin'], is_admin='1'))
dict(roles=[self.role_admin['id']], is_admin='1'))
def tearDown(self):
self.public_server.kill()
@ -536,8 +536,8 @@ class KeystoneClientTests(object):
def test_role_get(self):
client = self.get_client(admin=True)
role = client.roles.get(role='keystone_admin')
self.assertEquals(role.id, 'keystone_admin')
role = client.roles.get(role=self.role_admin['id'])
self.assertEquals(role.id, self.role_admin['id'])
def test_role_crud(self):
from keystoneclient import exceptions as client_exceptions
@ -784,7 +784,7 @@ class KeystoneClientTests(object):
# ROLE CRUD
self.assertRaises(exception,
two.roles.get,
role='keystone_admin')
role=self.role_admin['id'])
self.assertRaises(exception,
two.roles.list)
self.assertRaises(exception,
@ -792,7 +792,7 @@ class KeystoneClientTests(object):
name='oops')
self.assertRaises(exception,
two.roles.delete,
role='keystone_admin')
role=self.role_admin['id'])
# TODO(ja): MEMBERSHIP CRUD
# TODO(ja): determine what else todo

View File

@ -1,6 +1,7 @@
import uuid
from keystone.common.sql import util as sql_util
from keystone import auth
from keystone import test
from keystone import config
@ -19,6 +20,34 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
test.testsdir('backend_sql_disk.conf')])
sql_util.setup_test_database()
self.load_backends()
self.domain_id = uuid.uuid4().hex
self.domain = self.new_domain_ref()
self.domain['id'] = self.domain_id
self.identity_api.create_domain(self.domain_id, self.domain)
self.project_id = uuid.uuid4().hex
self.project = self.new_project_ref(
domain_id=self.domain_id)
self.project['id'] = self.project_id
self.identity_api.create_project(self.project_id, self.project)
self.user_id = uuid.uuid4().hex
self.user = self.new_user_ref(
domain_id=self.domain_id,
project_id=self.project_id)
self.user['id'] = self.user_id
self.identity_api.create_user(self.user_id, self.user)
# create & grant policy.json's default role for admin_required
self.role_id = uuid.uuid4().hex
self.role = self.new_role_ref()
self.role['id'] = self.role_id
self.role['name'] = 'admin'
self.identity_api.create_role(self.role_id, self.role)
self.identity_api.add_role_to_user_and_project(
self.user_id, self.project_id, self.role_id)
self.public_server = self.serveapp('keystone', name='main')
self.admin_server = self.serveapp('keystone', name='admin')
@ -28,6 +57,8 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
self.public_server = None
self.admin_server = None
sql_util.teardown_test_database()
# need to reset the plug-ins
auth.controllers.AUTH_METHODS = {}
def new_ref(self):
"""Populates a ref with attributes common to all API entities."""
@ -62,6 +93,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
ref = self.new_ref()
ref['domain_id'] = domain_id
ref['email'] = uuid.uuid4().hex
ref['password'] = uuid.uuid4().hex
if project_id:
ref['project_id'] = project_id
return ref
@ -92,22 +124,29 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
def get_scoped_token(self):
"""Convenience method so that we can test authenticated requests."""
# FIXME(dolph): should use real auth
return 'ADMIN'
r = self.admin_request(
method='POST',
path='/v3/tokens',
path='/v3/auth/tokens',
body={
'auth': {
'passwordCredentials': {
'username': self.user_foo['name'],
'password': self.user_foo['password'],
},
'tenantId': self.tenant_bar['id'],
'authentication': {
'methods': ['password'],
'password': {
'user': {
'name': self.user['name'],
'password': self.user['password'],
'domain': {
'id': self.user['domain_id']
}
}
}
},
'scope': {
'project': {
'id': self.project['id'],
}
}
})
return r.body['access']['token']['id']
return r.getheader('X-Subject-Token')
def v3_request(self, path, **kwargs):
path = '/v3' + path
@ -134,6 +173,13 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
def delete(self, path, **kwargs):
return self.v3_request(method='DELETE', path=path, **kwargs)
def assertValidErrorResponse(self, r):
self.assertIsNotNone(r.body.get('error'))
self.assertIsNotNone(r.body['error'].get('code'))
self.assertIsNotNone(r.body['error'].get('title'))
self.assertIsNotNone(r.body['error'].get('message'))
self.assertEqual(r.body['error']['code'], r.status)
def assertValidListResponse(self, resp, key, entity_validator, ref=None,
expected_length=None):
"""Make assertions common to all API list responses.

440
tests/test_v3_auth.py Normal file
View File

@ -0,0 +1,440 @@
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystone import auth
from keystone import config
from keystone import exception
from keystone.openstack.common import timeutils
from keystone import test
import test_v3
CONF = config.CONF
def _build_auth_scope(project_id=None, project_name=None,
project_domain_id=None, project_domain_name=None,
domain_id=None, domain_name=None):
scope_data = {}
if project_id or project_name:
scope_data['project'] = {}
if project_id:
scope_data['project']['id'] = project_id
else:
scope_data['project']['name'] = project_name
if project_domain_id or project_domain_name:
project_domain_json = {}
if project_domain_id:
project_domain_json['id'] = project_domain_id
else:
project_domain_json['name'] = project_domain_name
scope_data['project']['domain'] = project_domain_json
if domain_id or domain_name:
scope_data['domain'] = {}
if domain_id:
scope_data['domain']['id'] = domain_id
else:
scope_data['domain']['name'] = domain_name
return scope_data
def _build_password_auth(user_id=None, username=None,
user_domain_id=None, user_domain_name=None,
password=None):
password_data = {'user': {}}
if user_id:
password_data['user']['id'] = user_id
else:
password_data['user']['name'] = username
if user_domain_id or user_domain_name:
password_data['user']['domain'] = {}
if user_domain_id:
password_data['user']['domain']['id'] = user_domain_id
else:
password_data['user']['domain']['name'] = user_domain_name
password_data['user']['password'] = password
return password_data
def _build_token_auth(token):
return {'id': token}
def _build_authentication_request(token=None, user_id=None, username=None,
user_domain_id=None, user_domain_name=None,
password=None, project_id=None,
project_name=None, project_domain_id=None,
project_domain_name=None,
domain_id=None, domain_name=None):
"""Build auth dictionary.
It will create an auth dictionary based on all the arguments
that it receives.
"""
auth_data = {}
auth_data['authentication'] = {'methods': []}
if token:
auth_data['authentication']['methods'].append('token')
auth_data['authentication']['token'] = _build_token_auth(token)
if user_id or username:
auth_data['authentication']['methods'].append('password')
auth_data['authentication']['password'] = _build_password_auth(
user_id, username, user_domain_id, user_domain_name, password)
if project_id or project_name or domain_id or domain_name:
auth_data['scope'] = _build_auth_scope(project_id,
project_name,
project_domain_id,
project_domain_name,
domain_id,
domain_name)
return auth_data
class AuthTest(test_v3.RestfulTestCase):
def assertValidToken(self, token):
self.assertNotIn('roles', token)
self.assertEqual(self.user['id'], token['user']['id'])
self.assertIn('expires', token)
def assertValidScopedToken(self, token):
self.assertIn('roles', token)
self.assertIn('expires', token)
self.assertIn('catalog', token)
self.assertIn('user', token)
self.assertTrue(token['roles'])
for role in token['roles']:
self.assertIn('id', role)
self.assertIn('name', role)
self.assertEqual(self.user['id'], token['user']['id'])
self.assertEqual(self.user['name'], token['user']['name'])
self.assertEqual(self.user['domain_id'], token['user']['domain']['id'])
self.assertEqual(self.role_id, token['roles'][0]['id'])
def assertValidProjectScopedToken(self, token):
self.assertValidScopedToken(token)
self.assertIn('project', token)
self.assertIn('id', token['project'])
self.assertIn('name', token['project'])
self.assertIn('domain', token['project'])
self.assertIn('id', token['project']['domain'])
self.assertIn('name', token['project']['domain'])
def assertValidDomainScopedToken(self, token):
self.assertValidScopedToken(token)
self.assertIn('domain', token)
self.assertIn('id', token['domain'])
self.assertIn('name', token['domain'])
def assertValidProjectScopedToken(self, token):
self.assertNotEqual([], token['roles'])
self.assertEqual(self.user['id'], token['user']['id'])
self.assertEqual(self.role_id, token['roles'][0]['id'])
def assertEqualTokens(self, a, b):
"""Assert that two tokens are equal.
Compare two tokens except for their ids. This also truncates
the time in the comparison.
"""
def normalize(token):
del token['expires']
del token['issued_at']
return token
self.assertCloseEnoughForGovernmentWork(
timeutils.parse_isotime(a['expires']),
timeutils.parse_isotime(b['expires']))
self.assertCloseEnoughForGovernmentWork(
timeutils.parse_isotime(a['issued_at']),
timeutils.parse_isotime(b['issued_at']))
return self.assertDictEqual(normalize(a), normalize(b))
class TestAuthInfo(test.TestCase):
def setUp(self):
super(TestAuthInfo, self).setUp()
def test_missing_auth_methods(self):
auth_data = {'authentication': {}}
auth_data['authentication']['token'] = {'id': uuid.uuid4().hex}
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo,
None,
auth_data)
def test_unsupported_auth_method(self):
auth_data = {'methods': ['abc']}
auth_data['abc'] = {'test': 'test'}
auth_data = {'authentication': auth_data}
self.assertRaises(exception.AuthMethodNotSupported,
auth.controllers.AuthInfo,
None,
auth_data)
def test_missing_auth_method_data(self):
auth_data = {'methods': ['password']}
auth_data = {'authentication': auth_data}
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo,
None,
auth_data)
def test_project_name_no_domain(self):
auth_data = _build_authentication_request(username='test',
password='test',
project_name='abc')
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo,
None,
auth_data)
def test_both_project_and_domain_in_scope(self):
auth_data = _build_authentication_request(user_id='test',
password='test',
project_name='test',
domain_name='test')
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo,
None,
auth_data)
class TestTokenAPIs(AuthTest):
def setUp(self):
super(TestTokenAPIs, self).setUp()
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain_id,
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
self.token_data = resp.body
self.token = resp.getheader('X-Subject-Token')
self.headers = {'X-Subject-Token': resp.getheader('X-Subject-Token')}
def test_default_fixture_scope_token(self):
self.assertIsNotNone(self.get_scoped_token())
def test_v3_v2_uuid_token_intermix(self):
# FIXME(gyee): PKI tokens are not interchangeable because token
# data is baked into the token itself.
self.opt_in_group('signing', token_format='UUID')
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
resp = self.post('/auth/tokens', body=auth_data)
token_data = resp.body
token = resp.getheader('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET')
v2_token = resp.body
self.assertEqual(v2_token['access']['user']['id'],
token_data['user']['id'])
self.assertEqual(v2_token['access']['token']['expires'],
token_data['expires'])
self.assertEqual(v2_token['access']['user']['roles'][0]['id'],
token_data['roles'][0]['id'])
def test_v3_v2_pki_token_intermix(self):
# FIXME(gyee): PKI tokens are not interchangeable because token
# data is baked into the token itself.
self.opt_in_group('signing', token_format='PKI')
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
resp = self.post('/auth/tokens', body=auth_data)
token_data = resp.body
token = resp.getheader('X-Subject-Token')
# now validate the v3 token with v2 API
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
token='ADMIN',
method='GET')
v2_token = resp.body
self.assertEqual(v2_token['access']['user']['id'],
token_data['user']['id'])
self.assertEqual(v2_token['access']['token']['expires'],
token_data['expires'])
self.assertEqual(v2_token['access']['user']['roles'][0]['id'],
token_data['roles'][0]['id'])
def test_v2_v3_uuid_token_intermix(self):
self.opt_in_group('signing', token_format='UUID')
body = {
'auth': {
'passwordCredentials': {
'userId': self.user['id'],
'password': self.user['password']
},
'tenantId': self.project['id']
}}
resp = self.admin_request(path='/v2.0/tokens',
method='POST',
body=body)
v2_token_data = resp.body
v2_token = v2_token_data['access']['token']['id']
headers = {'X-Subject-Token': v2_token}
resp = self.get('/auth/tokens', headers=headers)
token_data = resp.body
self.assertEqual(v2_token_data['access']['user']['id'],
token_data['user']['id'])
self.assertEqual(v2_token_data['access']['token']['expires'],
token_data['expires'])
self.assertEqual(v2_token_data['access']['user']['roles'][0]['name'],
token_data['roles'][0]['name'])
def test_v2_v3_pki_token_intermix(self):
self.opt_in_group('signing', token_format='PKI')
body = {
'auth': {
'passwordCredentials': {
'userId': self.user['id'],
'password': self.user['password']
},
'tenantId': self.project['id']
}}
resp = self.admin_request(path='/v2.0/tokens',
method='POST',
body=body)
v2_token_data = resp.body
v2_token = v2_token_data['access']['token']['id']
headers = {'X-Subject-Token': v2_token}
resp = self.get('/auth/tokens', headers=headers)
token_data = resp.body
self.assertEqual(v2_token_data['access']['user']['id'],
token_data['user']['id'])
self.assertEqual(v2_token_data['access']['token']['expires'],
token_data['expires'])
self.assertEqual(v2_token_data['access']['user']['roles'][0]['name'],
token_data['roles'][0]['name'])
def test_rescoping_token(self):
expires = self.token_data['expires']
auth_data = _build_authentication_request(
token=self.token,
project_id=self.project_id)
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedToken(resp.body)
# make sure expires stayed the same
self.assertEqual(expires, resp.body['expires'])
def test_check_token(self):
resp = self.head('/auth/tokens', headers=self.headers)
self.assertEqual(resp.status, 204)
def test_validate_token(self):
resp = self.get('/auth/tokens', headers=self.headers)
self.assertValidToken(resp.body)
def test_revoke_token(self):
token = self.get_scoped_token()
headers = {'X-Subject-Token': token}
self.delete('/auth/tokens', headers=headers)
# make sure token no longer valid
resp = self.head('/auth/tokens', headers=headers,
expected_status=401)
self.assertEqual(resp.status, 401)
# make sure we have a CRL
resp = self.get('/auth/tokens/OS-PKI/revoked')
self.assertTrue('signed' in resp.body)
class TestAuth(AuthTest):
def test_unscope_token_with_name(self):
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain_id,
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidToken(resp.body)
def test_project_scope_token_with_name(self):
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain_id,
password=self.user['password'],
project_id=self.project_id)
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedToken(resp.body)
def test_auth_with_id(self):
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidToken(resp.body)
token = resp.getheader('X-Subject-Token')
headers = {'X-Subject-Token': resp.getheader('X-Subject-Token')}
# test token auth
auth_data = _build_authentication_request(token=token)
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidToken(resp.body)
def test_invalid_password(self):
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=uuid.uuid4().hex)
resp = self.post('/auth/tokens', body=auth_data,
expected_status=401)
self.assertEqual(resp.status, 401)
def test_invalid_username(self):
auth_data = _build_authentication_request(
username=uuid.uuid4().hex,
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data,
expected_status=401)
self.assertEqual(resp.status, 401)
def test_remote_user(self):
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
api = auth.controllers.Auth()
context = {'REMOTE_USER': self.user['name']}
auth_info = auth.controllers.AuthInfo(None, auth_data)
auth_context = {'extras': {}, 'method_names': []}
api.authenticate(context, auth_info, auth_context)
self.assertEqual(auth_context['user_id'], self.user['id'])
def test_remote_user_no_domain(self):
auth_data = _build_authentication_request(
username=self.user['name'],
password=self.user['password'])
api = auth.controllers.Auth()
context = {'REMOTE_USER': self.user['name']}
auth_info = auth.controllers.AuthInfo(None, auth_data)
auth_context = {'extras': {}, 'method_names': []}
self.assertRaises(exception.ValidationError,
api.authenticate,
context,
auth_info,
auth_context)

View File

@ -9,24 +9,6 @@ class IdentityTestCase(test_v3.RestfulTestCase):
def setUp(self):
super(IdentityTestCase, self).setUp()
self.domain_id = uuid.uuid4().hex
self.domain = self.new_domain_ref()
self.domain['id'] = self.domain_id
self.identity_api.create_domain(self.domain_id, self.domain)
self.project_id = uuid.uuid4().hex
self.project = self.new_project_ref(
domain_id=self.domain_id)
self.project['id'] = self.project_id
self.identity_api.create_project(self.project_id, self.project)
self.user_id = uuid.uuid4().hex
self.user = self.new_user_ref(
domain_id=self.domain_id,
project_id=self.project_id)
self.user['id'] = self.user_id
self.identity_api.create_user(self.user_id, self.user)
self.group_id = uuid.uuid4().hex
self.group = self.new_group_ref(
domain_id=self.domain_id)
@ -35,18 +17,13 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.credential_id = uuid.uuid4().hex
self.credential = self.new_credential_ref(
user_id=self.user_id,
user_id=self.user['id'],
project_id=self.project_id)
self.credential['id'] = self.credential_id
self.identity_api.create_credential(
self.credential_id,
self.credential)
self.role_id = uuid.uuid4().hex
self.role = self.new_role_ref()
self.role['id'] = self.role_id
self.identity_api.create_role(self.role_id, self.role)
# domain validation
def assertValidDomainListResponse(self, resp, **kwargs):
@ -225,15 +202,17 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertValidDomainResponse(r, self.domain)
# check that the project and user are still enabled
r = self.get('/projects/%(project_id)s' % {
'project_id': self.project_id})
self.assertValidProjectResponse(r, self.project)
self.assertTrue(r.body['project']['enabled'])
# FIXME(gyee): are these tests still valid since user should not
# be able to authenticate into a disabled domain
#r = self.get('/projects/%(project_id)s' % {
# 'project_id': self.project_id})
#self.assertValidProjectResponse(r, self.project)
#self.assertTrue(r.body['project']['enabled'])
r = self.get('/users/%(user_id)s' % {
'user_id': self.user_id})
self.assertValidUserResponse(r, self.user)
self.assertTrue(r.body['user']['enabled'])
#r = self.get('/users/%(user_id)s' % {
# 'user_id': self.user['id']})
#self.assertValidUserResponse(r, self.user)
#self.assertTrue(r.body['user']['enabled'])
# TODO(dolph): assert that v2 & v3 auth return 401
@ -298,25 +277,25 @@ class IdentityTestCase(test_v3.RestfulTestCase):
def test_get_user(self):
"""GET /users/{user_id}"""
r = self.get('/users/%(user_id)s' % {
'user_id': self.user_id})
'user_id': self.user['id']})
self.assertValidUserResponse(r, self.user)
def test_add_user_to_group(self):
"""PUT /groups/{group_id}/users/{user_id}"""
self.put('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
'group_id': self.group_id, 'user_id': self.user['id']})
def test_check_user_in_group(self):
"""HEAD /groups/{group_id}/users/{user_id}"""
self.put('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
'group_id': self.group_id, 'user_id': self.user['id']})
self.head('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
'group_id': self.group_id, 'user_id': self.user['id']})
def test_list_users_in_group(self):
"""GET /groups/{group_id}/users"""
r = self.put('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
'group_id': self.group_id, 'user_id': self.user['id']})
r = self.get('/groups/%(group_id)s/users' % {
'group_id': self.group_id})
self.assertValidUserListResponse(r, ref=self.user)
@ -326,23 +305,23 @@ class IdentityTestCase(test_v3.RestfulTestCase):
def test_remove_user_from_group(self):
"""DELETE /groups/{group_id}/users/{user_id}"""
self.put('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
'group_id': self.group_id, 'user_id': self.user['id']})
self.delete('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
'group_id': self.group_id, 'user_id': self.user['id']})
def test_update_user(self):
"""PATCH /users/{user_id}"""
user = self.new_user_ref(domain_id=self.domain_id)
del user['id']
r = self.patch('/users/%(user_id)s' % {
'user_id': self.user_id},
'user_id': self.user['id']},
body={'user': user})
self.assertValidUserResponse(r, user)
def test_delete_user(self):
"""DELETE /users/{user_id}"""
self.delete('/users/%(user_id)s' % {
'user_id': self.user_id})
'user_id': self.user['id']})
# group crud tests
@ -388,7 +367,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
def test_create_credential(self):
"""POST /credentials"""
ref = self.new_credential_ref(user_id=self.user_id)
ref = self.new_credential_ref(user_id=self.user['id'])
r = self.post(
'/credentials',
body={'credential': ref})
@ -404,7 +383,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
def test_update_credential(self):
"""PATCH /credentials/{credential_id}"""
ref = self.new_credential_ref(
user_id=self.user_id,
user_id=self.user['id'],
project_id=self.project_id)
del ref['id']
r = self.patch(
@ -457,8 +436,8 @@ class IdentityTestCase(test_v3.RestfulTestCase):
def test_crud_user_project_role_grants(self):
collection_url = (
'/projects/%(project_id)s/users/%(user_id)s/roles' % {
'project_id': self.project_id,
'user_id': self.user_id})
'project_id': self.project['id'],
'user_id': self.user['id']})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role_id}
@ -469,16 +448,18 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertValidRoleListResponse(r, ref=self.role)
self.assertIn(collection_url, r.body['links']['self'])
self.delete(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0)
self.assertIn(collection_url, r.body['links']['self'])
# FIXME(gyee): this test is no longer valid as user
# have no role in the project. Can't get a scoped token
#self.delete(member_url)
#r = self.get(collection_url)
#self.assertValidRoleListResponse(r, expected_length=0)
#self.assertIn(collection_url, r.body['links']['self'])
def test_crud_user_domain_role_grants(self):
collection_url = (
'/domains/%(domain_id)s/users/%(user_id)s/roles' % {
'domain_id': self.domain_id,
'user_id': self.user_id})
'user_id': self.user['id']})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role_id}