Implement Token Binding.
Brings token binding to keystone server. There are a number of places where the location or hardcoding of binding checks are not optimal however fixing them will require having a proper authentication plugin scheme so just assume that they will be moved when that happens. DocImpact Implements: blueprint authentication-tied-to-token Change-Id: Ib34e5e0b6bd83837f6addbd45d4c5b828ce2f3bd
This commit is contained in:
parent
53a03b53e7
commit
2667c772a3
doc/source
etc
keystone
auth
common
token
tests
@ -506,6 +506,44 @@ default, but can be enabled by including the following in ``keystone.conf``.
|
||||
enabled = True
|
||||
|
||||
|
||||
Token Binding
|
||||
-------------
|
||||
|
||||
Token binding refers to the practice of embedding information from external
|
||||
authentication providers (like a company's Kerberos server) inside the token
|
||||
such that a client may enforce that the token only be used in conjunction with
|
||||
that specified authentication. This is an additional security mechanism as it
|
||||
means that if a token is stolen it will not be usable without also providing the
|
||||
external authentication.
|
||||
|
||||
To activate token binding you must specify the types of authentication that
|
||||
token binding should be used for in ``keystone.conf`` e.g.::
|
||||
|
||||
[token]
|
||||
bind = kerberos
|
||||
|
||||
Currently only ``kerberos`` is supported.
|
||||
|
||||
To enforce checking of token binding the ``enforce_token_bind`` parameter
|
||||
should be set to one of the following modes:
|
||||
|
||||
* ``disabled`` disable token bind checking
|
||||
* ``permissive`` enable bind checking, if a token is bound to a mechanism that
|
||||
is unknown to the server then ignore it. This is the default.
|
||||
* ``strict`` enable bind checking, if a token is bound to a mechanism that is
|
||||
unknown to the server then this token should be rejected.
|
||||
* ``required`` enable bind checking and require that at least 1 bind mechanism
|
||||
is used for tokens.
|
||||
* named enable bind checking and require that the specified authentication
|
||||
mechanism is used. e.g.::
|
||||
|
||||
[token]
|
||||
enforce_token_bind = kerberos
|
||||
|
||||
*Do not* set ``enforce_token_bind = named`` as there is not an authentication
|
||||
mechanism called ``named``.
|
||||
|
||||
|
||||
Sample Configuration Files
|
||||
--------------------------
|
||||
|
||||
|
@ -133,6 +133,15 @@
|
||||
# Amount of time a token should remain valid (in seconds)
|
||||
# expiration = 86400
|
||||
|
||||
# External auth mechanisms that should add bind information to token.
|
||||
# eg kerberos, x509
|
||||
# bind =
|
||||
|
||||
# Enforcement policy on tokens presented to keystone with bind information.
|
||||
# One of disabled, permissive, strict, required or a specifically required bind
|
||||
# mode e.g. kerberos or x509 to require binding to that authentication.
|
||||
# enforce_token_bind = permissive
|
||||
|
||||
[policy]
|
||||
# driver = keystone.policy.backends.sql.Policy
|
||||
|
||||
|
@ -283,7 +283,7 @@ class Auth(controller.V3Controller):
|
||||
|
||||
try:
|
||||
auth_info = AuthInfo(context, auth=auth)
|
||||
auth_context = {'extras': {}, 'method_names': []}
|
||||
auth_context = {'extras': {}, 'method_names': [], 'bind': {}}
|
||||
self.authenticate(context, auth_info, auth_context)
|
||||
self._check_and_set_default_scoping(auth_info, auth_context)
|
||||
(domain_id, project_id, trust) = auth_info.get_scope()
|
||||
|
@ -42,6 +42,9 @@ class ExternalDefault(object):
|
||||
user_ref = auth_info.identity_api.get_user_by_name(username,
|
||||
domain_id)
|
||||
auth_context['user_id'] = user_ref['id']
|
||||
if ('kerberos' in CONF.token.bind and
|
||||
context.get('AUTH_TYPE', '').lower() == 'negotiate'):
|
||||
auth_context['bind']['kerberos'] = username
|
||||
except Exception:
|
||||
msg = _('Unable to lookup user %s') % (REMOTE_USER)
|
||||
raise exception.Unauthorized(msg)
|
||||
@ -75,6 +78,10 @@ class ExternalDomain(object):
|
||||
user_ref = auth_info.identity_api.get_user_by_name(username,
|
||||
domain_id)
|
||||
auth_context['user_id'] = user_ref['id']
|
||||
if ('kerberos' in CONF.token.bind and
|
||||
context.get('AUTH_TYPE', '').lower() == 'negotiate'):
|
||||
auth_context['bind']['kerberos'] = username
|
||||
|
||||
except Exception:
|
||||
msg = _('Unable to lookup user %s') % (REMOTE_USER)
|
||||
raise exception.Unauthorized(msg)
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
from keystone import auth
|
||||
from keystone.common import logging
|
||||
from keystone.common import wsgi
|
||||
from keystone import exception
|
||||
from keystone import token
|
||||
|
||||
@ -36,6 +37,7 @@ class Token(auth.AuthMethodHandler):
|
||||
target=METHOD_NAME)
|
||||
token_id = auth_payload['id']
|
||||
token_ref = self.token_api.get_token(token_id)
|
||||
wsgi.validate_token_bind(context, token_ref)
|
||||
user_context.setdefault(
|
||||
'user_id', token_ref['token_data']['token']['user']['id'])
|
||||
# to support Grizzly-3 to Grizzly-RC1 transition
|
||||
|
@ -217,6 +217,10 @@ def configure():
|
||||
# os_inherit
|
||||
register_bool('enabled', group='os_inherit', default=False)
|
||||
|
||||
# binding
|
||||
register_list('bind', group='token', default=[])
|
||||
register_str('enforce_token_bind', group='token', default='permissive')
|
||||
|
||||
# ssl
|
||||
register_bool('enable', group='ssl', default=False)
|
||||
register_str('certfile', group='ssl',
|
||||
|
@ -25,6 +25,10 @@ def _build_policy_check_credentials(self, action, context, kwargs):
|
||||
LOG.warning(_('RBAC: Invalid token'))
|
||||
raise exception.Unauthorized()
|
||||
|
||||
# NOTE(jamielennox): whilst this maybe shouldn't be within this function
|
||||
# it would otherwise need to reload the token_ref from backing store.
|
||||
wsgi.validate_token_bind(context, token_ref)
|
||||
|
||||
creds = {}
|
||||
if 'token_data' in token_ref and 'token' in token_ref['token_data']:
|
||||
#V3 Tokens
|
||||
|
@ -73,6 +73,55 @@ def mask_password(message, is_unicode=False, secret="***"):
|
||||
return result
|
||||
|
||||
|
||||
def validate_token_bind(context, token_ref):
|
||||
bind_mode = CONF.token.enforce_token_bind
|
||||
|
||||
if bind_mode == 'disabled':
|
||||
return
|
||||
|
||||
bind = token_ref.get('bind', {})
|
||||
|
||||
# permissive and strict modes don't require there to be a bind
|
||||
permissive = bind_mode in ('permissive', 'strict')
|
||||
|
||||
# get the named mode if bind_mode is not one of the known
|
||||
name = None if permissive or bind_mode == 'required' else bind_mode
|
||||
|
||||
if not bind:
|
||||
if permissive:
|
||||
# no bind provided and none required
|
||||
return
|
||||
else:
|
||||
LOG.info(_("No bind information present in token"))
|
||||
raise exception.Unauthorized()
|
||||
|
||||
if name and name not in bind:
|
||||
LOG.info(_("Named bind mode %s not in bind information"), name)
|
||||
raise exception.Unauthorized()
|
||||
|
||||
for bind_type, identifier in bind.iteritems():
|
||||
if bind_type == 'kerberos':
|
||||
if not context.get('AUTH_TYPE', '').lower() == 'negotiate':
|
||||
LOG.info(_("Kerberos credentials required and not present"))
|
||||
raise exception.Unauthorized()
|
||||
|
||||
if not context.get('REMOTE_USER') == identifier:
|
||||
LOG.info(_("Kerberos credentials do not match those in bind"))
|
||||
raise exception.Unauthorized()
|
||||
|
||||
LOG.info(_("Kerberos bind authentication successful"))
|
||||
|
||||
elif bind_mode == 'permissive':
|
||||
LOG.debug(_("Ignoring unknown bind for permissive mode: "
|
||||
"{%(bind_type)s: %(identifier)s}"),
|
||||
{'bind_type': bind_type, 'identifier': identifier})
|
||||
else:
|
||||
LOG.info(_("Couldn't verify unknown bind: "
|
||||
"{%(bind_type)s: %(identifier)s}"),
|
||||
{'bind_type': bind_type, 'identifier': identifier})
|
||||
raise exception.Unauthorized()
|
||||
|
||||
|
||||
class WritableLogger(object):
|
||||
"""A thin wrapper that responds to `write` and logs."""
|
||||
|
||||
@ -167,10 +216,16 @@ class Application(BaseApplication):
|
||||
context['headers'] = dict(req.headers.iteritems())
|
||||
context['path'] = req.environ['PATH_INFO']
|
||||
params = req.environ.get(PARAMS_ENV, {})
|
||||
if 'REMOTE_USER' in req.environ:
|
||||
context['REMOTE_USER'] = req.environ['REMOTE_USER']
|
||||
elif context.get('REMOTE_USER', None) is not None:
|
||||
del context['REMOTE_USER']
|
||||
|
||||
for name in ['REMOTE_USER', 'AUTH_TYPE']:
|
||||
try:
|
||||
context[name] = req.environ[name]
|
||||
except KeyError:
|
||||
try:
|
||||
del context[name]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
params.update(arg_dict)
|
||||
|
||||
context.setdefault('is_admin', False)
|
||||
@ -233,6 +288,7 @@ class Application(BaseApplication):
|
||||
except exception.TokenNotFound as e:
|
||||
raise exception.Unauthorized(e)
|
||||
|
||||
validate_token_bind(context, user_token_ref)
|
||||
creds = user_token_ref['metadata'].copy()
|
||||
|
||||
try:
|
||||
|
@ -5,6 +5,7 @@ from keystone.common import controller
|
||||
from keystone.common import dependency
|
||||
from keystone.common import logging
|
||||
from keystone.common import utils
|
||||
from keystone.common import wsgi
|
||||
from keystone import config
|
||||
from keystone import exception
|
||||
from keystone.openstack.common import timeutils
|
||||
@ -78,7 +79,7 @@ class Auth(controller.V2Controller):
|
||||
auth_info = self._authenticate_local(
|
||||
context, auth)
|
||||
|
||||
user_ref, tenant_ref, metadata_ref, expiry = auth_info
|
||||
user_ref, tenant_ref, metadata_ref, expiry, bind = auth_info
|
||||
core.validate_auth_info(self, user_ref, tenant_ref)
|
||||
user_ref = self._filter_domain_id(user_ref)
|
||||
if tenant_ref:
|
||||
@ -97,6 +98,8 @@ class Auth(controller.V2Controller):
|
||||
catalog_ref = {}
|
||||
|
||||
auth_token_data['id'] = 'placeholder'
|
||||
if bind:
|
||||
auth_token_data['bind'] = bind
|
||||
|
||||
roles_ref = []
|
||||
for role_id in metadata_ref.get('roles', []):
|
||||
@ -133,6 +136,8 @@ class Auth(controller.V2Controller):
|
||||
except exception.NotFound as e:
|
||||
raise exception.Unauthorized(e)
|
||||
|
||||
wsgi.validate_token_bind(context, old_token_ref)
|
||||
|
||||
#A trust token cannot be used to get another token
|
||||
if 'trust' in old_token_ref:
|
||||
raise exception.Forbidden()
|
||||
@ -194,7 +199,9 @@ class Auth(controller.V2Controller):
|
||||
metadata_ref['trustee_user_id'] = trust_ref['trustee_user_id']
|
||||
metadata_ref['trust_id'] = trust_id
|
||||
|
||||
return (current_user_ref, tenant_ref, metadata_ref, expiry)
|
||||
bind = old_token_ref.get('bind', None)
|
||||
|
||||
return (current_user_ref, tenant_ref, metadata_ref, expiry, bind)
|
||||
|
||||
def _authenticate_local(self, context, auth):
|
||||
"""Try to authenticate against the identity backend.
|
||||
@ -252,7 +259,7 @@ class Auth(controller.V2Controller):
|
||||
user_id, tenant_id)
|
||||
|
||||
expiry = core.default_expire_time()
|
||||
return (user_ref, tenant_ref, metadata_ref, expiry)
|
||||
return (user_ref, tenant_ref, metadata_ref, expiry, None)
|
||||
|
||||
def _authenticate_external(self, context, auth):
|
||||
"""Try to authenticate an external user via REMOTE_USER variable.
|
||||
@ -281,7 +288,12 @@ class Auth(controller.V2Controller):
|
||||
user_id, tenant_id)
|
||||
|
||||
expiry = core.default_expire_time()
|
||||
return (user_ref, tenant_ref, metadata_ref, expiry)
|
||||
bind = None
|
||||
if ('kerberos' in CONF.token.bind and
|
||||
context.get('AUTH_TYPE', '').lower() == 'negotiate'):
|
||||
bind = {'kerberos': username}
|
||||
|
||||
return (user_ref, tenant_ref, metadata_ref, expiry, bind)
|
||||
|
||||
def _get_auth_token_data(self, user, tenant, metadata, expiry):
|
||||
return dict(user=user,
|
||||
|
@ -59,6 +59,8 @@ class V2TokenDataHelper(object):
|
||||
}
|
||||
}
|
||||
}
|
||||
if 'bind' in token_ref:
|
||||
o['access']['token']['bind'] = token_ref['bind']
|
||||
if 'tenant' in token_ref and token_ref['tenant']:
|
||||
token_ref['tenant']['enabled'] = True
|
||||
o['access']['token']['tenant'] = token_ref['tenant']
|
||||
@ -285,7 +287,8 @@ class V3TokenDataHelper(object):
|
||||
|
||||
def get_token_data(self, user_id, method_names, extras,
|
||||
domain_id=None, project_id=None, expires=None,
|
||||
trust=None, token=None, include_catalog=True):
|
||||
trust=None, token=None, include_catalog=True,
|
||||
bind=None):
|
||||
token_data = {'methods': method_names,
|
||||
'extras': extras}
|
||||
|
||||
@ -299,6 +302,9 @@ class V3TokenDataHelper(object):
|
||||
if user_id != trust['trustee_user_id']:
|
||||
raise exception.Forbidden(_('User is not a trustee.'))
|
||||
|
||||
if bind:
|
||||
token_data['bind'] = bind
|
||||
|
||||
self._populate_scope(token_data, domain_id, project_id)
|
||||
self._populate_user(token_data, user_id, domain_id, project_id, trust)
|
||||
self._populate_roles(token_data, user_id, domain_id, project_id, trust)
|
||||
@ -346,6 +352,7 @@ class Provider(token.provider.Provider):
|
||||
tenant=token_ref['tenant'],
|
||||
metadata=token_ref['metadata'],
|
||||
token_data=token_data,
|
||||
bind=token_ref.get('bind'),
|
||||
trust_id=token_ref['metadata'].get('trust_id'))
|
||||
self.token_api.create_token(token_id, data)
|
||||
except Exception:
|
||||
@ -381,6 +388,7 @@ class Provider(token.provider.Provider):
|
||||
project_id=project_id,
|
||||
expires=expires_at,
|
||||
trust=trust,
|
||||
bind=auth_context.get('bind') if auth_context else None,
|
||||
include_catalog=include_catalog)
|
||||
|
||||
token_id = self._get_token_id(token_data)
|
||||
@ -542,6 +550,7 @@ class Provider(token.provider.Provider):
|
||||
['password', 'token'],
|
||||
{},
|
||||
project_id=project_id,
|
||||
bind=token_ref.get('bind'),
|
||||
expires=token_ref['expires'])
|
||||
return token_data
|
||||
|
||||
|
@ -1,3 +1,3 @@
|
||||
[auth]
|
||||
methods = external
|
||||
external = keystone.auth.plugins.external.ExternalDomain
|
||||
methods = external, password, token
|
||||
external = keystone.auth.plugins.external.ExternalDomain
|
||||
|
@ -373,6 +373,33 @@ class AuthWithToken(AuthTest):
|
||||
dict(is_admin=True, query_string={'belongsTo': 'BAR'}),
|
||||
token_id=scoped_token_id)
|
||||
|
||||
def test_token_auth_with_binding(self):
|
||||
CONF.token.bind = ['kerberos']
|
||||
body_dict = _build_user_auth()
|
||||
context = {'REMOTE_USER': 'FOO', 'AUTH_TYPE': 'Negotiate'}
|
||||
unscoped_token = self.controller.authenticate(context, body_dict)
|
||||
|
||||
# the token should have bind information in it
|
||||
bind = unscoped_token['access']['token']['bind']
|
||||
self.assertEqual(bind['kerberos'], 'FOO')
|
||||
|
||||
body_dict = _build_user_auth(
|
||||
token=unscoped_token['access']['token'],
|
||||
tenant_name='BAR')
|
||||
|
||||
# using unscoped token without remote user context fails
|
||||
self.assertRaises(
|
||||
exception.Unauthorized,
|
||||
self.controller.authenticate,
|
||||
{}, body_dict)
|
||||
|
||||
# using token with remote user context succeeds
|
||||
scoped_token = self.controller.authenticate(context, body_dict)
|
||||
|
||||
# the bind information should be carried over from the original token
|
||||
bind = scoped_token['access']['token']['bind']
|
||||
self.assertEqual(bind['kerberos'], 'FOO')
|
||||
|
||||
|
||||
class AuthWithPasswordCredentials(AuthTest):
|
||||
def setUp(self):
|
||||
@ -431,6 +458,13 @@ class AuthWithPasswordCredentials(AuthTest):
|
||||
self.controller.authenticate,
|
||||
{}, body_dict)
|
||||
|
||||
def test_bind_without_remote_user(self):
|
||||
CONF.token.bind = ['kerberos']
|
||||
body_dict = _build_user_auth(username='FOO', password='foo2',
|
||||
tenant_name='BAR')
|
||||
token = self.controller.authenticate({}, body_dict)
|
||||
self.assertNotIn('bind', token['access']['token'])
|
||||
|
||||
|
||||
class AuthWithRemoteUser(AuthTest):
|
||||
def setUp(self):
|
||||
@ -498,6 +532,20 @@ class AuthWithRemoteUser(AuthTest):
|
||||
{'REMOTE_USER': uuid.uuid4().hex},
|
||||
body_dict)
|
||||
|
||||
def test_bind_with_kerberos(self):
|
||||
CONF.token.bind = ['kerberos']
|
||||
kerb = {'REMOTE_USER': 'FOO', 'AUTH_TYPE': 'Negotiate'}
|
||||
body_dict = _build_user_auth(tenant_name="BAR")
|
||||
token = self.controller.authenticate(kerb, body_dict)
|
||||
self.assertEqual(token['access']['token']['bind']['kerberos'], 'FOO')
|
||||
|
||||
def test_bind_without_config_opt(self):
|
||||
CONF.token.bind = ['x509']
|
||||
kerb = {'REMOTE_USER': 'FOO', 'AUTH_TYPE': 'Negotiate'}
|
||||
body_dict = _build_user_auth(tenant_name='BAR')
|
||||
token = self.controller.authenticate(kerb, body_dict)
|
||||
self.assertNotIn('bind', token['access']['token'])
|
||||
|
||||
|
||||
class AuthWithTrust(AuthTest):
|
||||
def setUp(self):
|
||||
|
182
tests/test_token_bind.py
Normal file
182
tests/test_token_bind.py
Normal file
@ -0,0 +1,182 @@
|
||||
# Copyright 2013 OpenStack LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from keystone.common import wsgi
|
||||
from keystone import config
|
||||
from keystone import exception
|
||||
from keystone import test
|
||||
|
||||
CONF = config.CONF
|
||||
|
||||
KERBEROS_BIND = 'USER@REALM'
|
||||
|
||||
# the only thing the function checks for is the presence of bind
|
||||
TOKEN_BIND_KERB = {'bind': {'kerberos': KERBEROS_BIND}}
|
||||
TOKEN_BIND_UNKNOWN = {'bind': {'FOO': 'BAR'}}
|
||||
TOKEN_BIND_NONE = {}
|
||||
|
||||
ANY = 'any'
|
||||
ALL_TOKENS = [TOKEN_BIND_KERB, TOKEN_BIND_UNKNOWN, TOKEN_BIND_NONE]
|
||||
|
||||
|
||||
class BindTest(test.TestCase):
|
||||
"""Test binding tokens to a Principal.
|
||||
|
||||
Even though everything in this file references kerberos the same concepts
|
||||
will apply to all future binding mechanisms.
|
||||
"""
|
||||
|
||||
def assert_kerberos_bind(self, tokens, bind_level,
|
||||
use_kerberos=True, success=True):
|
||||
if not isinstance(tokens, dict):
|
||||
for token in tokens:
|
||||
self.assert_kerberos_bind(token, bind_level,
|
||||
use_kerberos=use_kerberos,
|
||||
success=success)
|
||||
elif use_kerberos == ANY:
|
||||
for val in (True, False):
|
||||
self.assert_kerberos_bind(tokens, bind_level,
|
||||
use_kerberos=val, success=success)
|
||||
else:
|
||||
context = {}
|
||||
CONF.token.enforce_token_bind = bind_level
|
||||
|
||||
if use_kerberos:
|
||||
context['REMOTE_USER'] = KERBEROS_BIND
|
||||
context['AUTH_TYPE'] = 'Negotiate'
|
||||
|
||||
if not success:
|
||||
self.assertRaises(exception.Unauthorized,
|
||||
wsgi.validate_token_bind,
|
||||
context, tokens)
|
||||
else:
|
||||
wsgi.validate_token_bind(context, tokens)
|
||||
|
||||
# DISABLED
|
||||
|
||||
def test_bind_disabled_with_kerb_user(self):
|
||||
self.assert_kerberos_bind(ALL_TOKENS,
|
||||
bind_level='disabled',
|
||||
use_kerberos=ANY,
|
||||
success=True)
|
||||
|
||||
# PERMISSIVE
|
||||
|
||||
def test_bind_permissive_with_kerb_user(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_KERB,
|
||||
bind_level='permissive',
|
||||
use_kerberos=True,
|
||||
success=True)
|
||||
|
||||
def test_bind_permissive_with_regular_token(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_NONE,
|
||||
bind_level='permissive',
|
||||
use_kerberos=ANY,
|
||||
success=True)
|
||||
|
||||
def test_bind_permissive_without_kerb_user(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_KERB,
|
||||
bind_level='permissive',
|
||||
use_kerberos=False,
|
||||
success=False)
|
||||
|
||||
def test_bind_permissive_with_unknown_bind(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_UNKNOWN,
|
||||
bind_level='permissive',
|
||||
use_kerberos=ANY,
|
||||
success=True)
|
||||
|
||||
# STRICT
|
||||
|
||||
def test_bind_strict_with_regular_token(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_NONE,
|
||||
bind_level='strict',
|
||||
use_kerberos=ANY,
|
||||
success=True)
|
||||
|
||||
def test_bind_strict_with_kerb_user(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_KERB,
|
||||
bind_level='strict',
|
||||
use_kerberos=True,
|
||||
success=True)
|
||||
|
||||
def test_bind_strict_without_kerb_user(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_KERB,
|
||||
bind_level='strict',
|
||||
use_kerberos=False,
|
||||
success=False)
|
||||
|
||||
def test_bind_strict_with_unknown_bind(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_UNKNOWN,
|
||||
bind_level='strict',
|
||||
use_kerberos=ANY,
|
||||
success=False)
|
||||
|
||||
# REQUIRED
|
||||
|
||||
def test_bind_required_with_regular_token(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_NONE,
|
||||
bind_level='required',
|
||||
use_kerberos=ANY,
|
||||
success=False)
|
||||
|
||||
def test_bind_required_with_kerb_user(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_KERB,
|
||||
bind_level='required',
|
||||
use_kerberos=True,
|
||||
success=True)
|
||||
|
||||
def test_bind_required_without_kerb_user(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_KERB,
|
||||
bind_level='required',
|
||||
use_kerberos=False,
|
||||
success=False)
|
||||
|
||||
def test_bind_required_with_unknown_bind(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_UNKNOWN,
|
||||
bind_level='required',
|
||||
use_kerberos=ANY,
|
||||
success=False)
|
||||
|
||||
# NAMED
|
||||
|
||||
def test_bind_named_with_regular_token(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_NONE,
|
||||
bind_level='kerberos',
|
||||
use_kerberos=ANY,
|
||||
success=False)
|
||||
|
||||
def test_bind_named_with_kerb_user(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_KERB,
|
||||
bind_level='kerberos',
|
||||
use_kerberos=True,
|
||||
success=True)
|
||||
|
||||
def test_bind_named_without_kerb_user(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_KERB,
|
||||
bind_level='kerberos',
|
||||
use_kerberos=False,
|
||||
success=False)
|
||||
|
||||
def test_bind_named_with_unknown_bind(self):
|
||||
self.assert_kerberos_bind(TOKEN_BIND_UNKNOWN,
|
||||
bind_level='kerberos',
|
||||
use_kerberos=ANY,
|
||||
success=False)
|
||||
|
||||
def test_bind_named_with_unknown_scheme(self):
|
||||
self.assert_kerberos_bind(ALL_TOKENS,
|
||||
bind_level='unknown',
|
||||
use_kerberos=ANY,
|
||||
success=False)
|
@ -767,6 +767,8 @@ class TestAuthExternalDisabled(test_v3.RestfulTestCase):
|
||||
|
||||
|
||||
class TestAuthExternalDomain(test_v3.RestfulTestCase):
|
||||
content_type = 'json'
|
||||
|
||||
def config_files(self):
|
||||
list = self._config_file_list[:]
|
||||
list.append('auth_plugin_external_domain.conf')
|
||||
@ -782,6 +784,27 @@ class TestAuthExternalDomain(test_v3.RestfulTestCase):
|
||||
api.authenticate(context, auth_info, auth_context)
|
||||
self.assertEqual(auth_context['user_id'], self.user['id'])
|
||||
|
||||
def test_project_id_scoped_with_remote_user(self):
|
||||
CONF.token.bind = ['kerberos']
|
||||
auth_data = self.build_authentication_request(
|
||||
project_id=self.project['id'])
|
||||
remote_user = '%s@%s' % (self.user['name'], self.domain['name'])
|
||||
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
|
||||
'AUTH_TYPE': 'Negotiate'})
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
token = self.assertValidProjectScopedTokenResponse(r)
|
||||
self.assertEquals(token['bind']['kerberos'], self.user['name'])
|
||||
|
||||
def test_unscoped_bind_with_remote_user(self):
|
||||
CONF.token.bind = ['kerberos']
|
||||
auth_data = self.build_authentication_request()
|
||||
remote_user = '%s@%s' % (self.user['name'], self.domain['name'])
|
||||
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
|
||||
'AUTH_TYPE': 'Negotiate'})
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
token = self.assertValidUnscopedTokenResponse(r)
|
||||
self.assertEquals(token['bind']['kerberos'], self.user['name'])
|
||||
|
||||
|
||||
class TestAuthJSON(test_v3.RestfulTestCase):
|
||||
content_type = 'json'
|
||||
@ -1303,6 +1326,84 @@ class TestAuthJSON(test_v3.RestfulTestCase):
|
||||
auth_info,
|
||||
auth_context)
|
||||
|
||||
def test_bind_not_set_with_remote_user(self):
|
||||
CONF.token.bind = []
|
||||
auth_data = self.build_authentication_request()
|
||||
remote_user = self.default_domain_user['name']
|
||||
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
|
||||
'AUTH_TYPE': 'Negotiate'})
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
token = self.assertValidUnscopedTokenResponse(r)
|
||||
self.assertNotIn('bind', token)
|
||||
|
||||
#TODO(ayoung): move to TestPKITokenAPIs; it will be run for both formats
|
||||
def test_verify_with_bound_token(self):
|
||||
self.opt_in_group('token', bind='kerberos')
|
||||
auth_data = self.build_authentication_request(
|
||||
project_id=self.project['id'])
|
||||
remote_user = self.default_domain_user['name']
|
||||
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
|
||||
'AUTH_TYPE': 'Negotiate'})
|
||||
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
|
||||
token = resp.headers.get('X-Subject-Token')
|
||||
headers = {'X-Subject-Token': token}
|
||||
r = self.get('/auth/tokens', headers=headers, token=token)
|
||||
token = self.assertValidProjectScopedTokenResponse(r)
|
||||
self.assertEqual(token['bind']['kerberos'],
|
||||
self.default_domain_user['name'])
|
||||
|
||||
def test_auth_with_bind_token(self):
|
||||
CONF.token.bind = ['kerberos']
|
||||
|
||||
auth_data = self.build_authentication_request()
|
||||
remote_user = self.default_domain_user['name']
|
||||
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
|
||||
'AUTH_TYPE': 'Negotiate'})
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
|
||||
# the unscoped token should have bind information in it
|
||||
token = self.assertValidUnscopedTokenResponse(r)
|
||||
self.assertEqual(token['bind']['kerberos'], remote_user)
|
||||
|
||||
token = r.headers.get('X-Subject-Token')
|
||||
|
||||
# using unscoped token with remote user succeeds
|
||||
auth_params = {'token': token, 'project_id': self.project_id}
|
||||
auth_data = self.build_authentication_request(**auth_params)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
token = self.assertValidProjectScopedTokenResponse(r)
|
||||
|
||||
# the bind information should be carried over from the original token
|
||||
self.assertEqual(token['bind']['kerberos'], remote_user)
|
||||
|
||||
def test_v2_v3_bind_token_intermix(self):
|
||||
self.opt_in_group('token', bind='kerberos')
|
||||
|
||||
# we need our own user registered to the default domain because of
|
||||
# the way external auth works.
|
||||
remote_user = self.default_domain_user['name']
|
||||
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
|
||||
'AUTH_TYPE': 'Negotiate'})
|
||||
body = {'auth': {}}
|
||||
resp = self.admin_request(path='/v2.0/tokens',
|
||||
method='POST',
|
||||
body=body)
|
||||
|
||||
v2_token_data = resp.result
|
||||
|
||||
bind = v2_token_data['access']['token']['bind']
|
||||
self.assertEqual(bind['kerberos'], self.default_domain_user['name'])
|
||||
|
||||
v2_token_id = v2_token_data['access']['token']['id']
|
||||
headers = {'X-Subject-Token': v2_token_id}
|
||||
resp = self.get('/auth/tokens', headers=headers)
|
||||
token_data = resp.result
|
||||
|
||||
self.assertDictEqual(v2_token_data['access']['token']['bind'],
|
||||
token_data['token']['bind'])
|
||||
|
||||
|
||||
class TestAuthXML(TestAuthJSON):
|
||||
content_type = 'xml'
|
||||
|
Loading…
x
Reference in New Issue
Block a user