Cleanup TODO, AuthContext and AuthInfo to auth.core

Moved AuthContext and AuthInfo to keystone.auth.core as they are shared
code bits and not exclusively controller specific.

Change-Id: I649690d9e39057249e674500d85a053e0c28b30e
This commit is contained in:
Morgan Fainberg 2017-01-29 14:08:21 -08:00
parent 1451659e48
commit 29951be748
7 changed files with 309 additions and 313 deletions

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import sys
from keystoneclient.common import cms
from oslo_log import log
from oslo_serialization import jsonutils
@ -29,7 +27,7 @@ from keystone.common import wsgi
import keystone.conf
from keystone import exception
from keystone.federation import constants
from keystone.i18n import _, _LI, _LW, _LE
from keystone.i18n import _, _LW, _LE
from keystone.resource import controllers as resource_controllers
@ -38,284 +36,6 @@ LOG = log.getLogger(__name__)
CONF = keystone.conf.CONF
# TODO(notmorgan): Move Common Auth Code (AuthContext and AuthInfo)
# loading into keystone.auth.core (and update all references to the new
# locations)
class AuthContext(dict):
"""Retrofitting auth_context to reconcile identity attributes.
The identity attributes must not have conflicting values among the
auth plug-ins. The only exception is `expires_at`, which is set to its
earliest value.
"""
# identity attributes need to be reconciled among the auth plugins
IDENTITY_ATTRIBUTES = frozenset(['user_id', 'project_id',
'access_token_id', 'domain_id',
'expires_at'])
def __setitem__(self, key, val):
"""Override __setitem__ to prevent conflicting values."""
if key in self.IDENTITY_ATTRIBUTES and key in self:
existing_val = self[key]
if key == 'expires_at':
# special treatment for 'expires_at', we are going to take
# the earliest expiration instead.
if existing_val != val:
LOG.info(_LI('"expires_at" has conflicting values '
'%(existing)s and %(new)s. Will use the '
'earliest value.'),
{'existing': existing_val, 'new': val})
if existing_val is None or val is None:
val = existing_val or val
else:
val = min(existing_val, val)
elif existing_val != val:
msg = _('Unable to reconcile identity attribute %(attribute)s '
'as it has conflicting values %(new)s and %(old)s') % (
{'attribute': key,
'new': val,
'old': existing_val})
raise exception.Unauthorized(msg)
return super(AuthContext, self).__setitem__(key, val)
def update(self, E=None, **F):
"""Override update to prevent conflicting values."""
# NOTE(notmorgan): This will not be nearly as performant as the
# use of the built-in "update" method on the dict, however, the
# volume of data being changed here is very minimal in most cases
# and should not see a significant impact by iterating instead of
# explicit setting of values.
update_dicts = (E or {}, F or {})
for d in update_dicts:
for key, val in d.items():
self[key] = val
@dependency.requires('resource_api', 'trust_api')
class AuthInfo(object):
"""Encapsulation of "auth" request."""
@staticmethod
def create(auth=None, scope_only=False):
auth_info = AuthInfo(auth=auth)
auth_info._validate_and_normalize_auth_data(scope_only)
return auth_info
def __init__(self, auth=None):
self.auth = auth
self._scope_data = (None, None, None, None)
# self._scope_data is (domain_id, project_id, trust_ref, unscoped)
# project scope: (None, project_id, None, None)
# domain scope: (domain_id, None, None, None)
# trust scope: (None, None, trust_ref, None)
# unscoped: (None, None, None, 'unscoped')
def _assert_project_is_enabled(self, project_ref):
# ensure the project is enabled
try:
self.resource_api.assert_project_enabled(
project_id=project_ref['id'],
project=project_ref)
except AssertionError as e:
LOG.warning(six.text_type(e))
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
def _assert_domain_is_enabled(self, domain_ref):
try:
self.resource_api.assert_domain_enabled(
domain_id=domain_ref['id'],
domain=domain_ref)
except AssertionError as e:
LOG.warning(six.text_type(e))
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
def _lookup_domain(self, domain_info):
domain_id = domain_info.get('id')
domain_name = domain_info.get('name')
try:
if domain_name:
if (CONF.resource.domain_name_url_safe == 'strict' and
utils.is_not_url_safe(domain_name)):
msg = _('Domain name cannot contain reserved characters.')
LOG.warning(msg)
raise exception.Unauthorized(message=msg)
domain_ref = self.resource_api.get_domain_by_name(
domain_name)
else:
domain_ref = self.resource_api.get_domain(domain_id)
except exception.DomainNotFound as e:
LOG.warning(six.text_type(e))
raise exception.Unauthorized(e)
self._assert_domain_is_enabled(domain_ref)
return domain_ref
def _lookup_project(self, project_info):
project_id = project_info.get('id')
project_name = project_info.get('name')
try:
if project_name:
if (CONF.resource.project_name_url_safe == 'strict' and
utils.is_not_url_safe(project_name)):
msg = _('Project name cannot contain reserved characters.')
LOG.warning(msg)
raise exception.Unauthorized(message=msg)
if 'domain' not in project_info:
raise exception.ValidationError(attribute='domain',
target='project')
domain_ref = self._lookup_domain(project_info['domain'])
project_ref = self.resource_api.get_project_by_name(
project_name, domain_ref['id'])
else:
project_ref = self.resource_api.get_project(project_id)
# NOTE(morganfainberg): The _lookup_domain method will raise
# exception.Unauthorized if the domain isn't found or is
# disabled.
self._lookup_domain({'id': project_ref['domain_id']})
except exception.ProjectNotFound as e:
LOG.warning(six.text_type(e))
raise exception.Unauthorized(e)
self._assert_project_is_enabled(project_ref)
return project_ref
def _lookup_trust(self, trust_info):
trust_id = trust_info.get('id')
if not trust_id:
raise exception.ValidationError(attribute='trust_id',
target='trust')
trust = self.trust_api.get_trust(trust_id)
return trust
def _validate_and_normalize_scope_data(self):
"""Validate and normalize scope data."""
if 'scope' not in self.auth:
return
if sum(['project' in self.auth['scope'],
'domain' in self.auth['scope'],
'unscoped' in self.auth['scope'],
'OS-TRUST:trust' in self.auth['scope']]) != 1:
raise exception.ValidationError(
attribute='project, domain, OS-TRUST:trust or unscoped',
target='scope')
if 'unscoped' in self.auth['scope']:
self._scope_data = (None, None, None, 'unscoped')
return
if 'project' in self.auth['scope']:
project_ref = self._lookup_project(self.auth['scope']['project'])
self._scope_data = (None, project_ref['id'], None, None)
elif 'domain' in self.auth['scope']:
domain_ref = self._lookup_domain(self.auth['scope']['domain'])
self._scope_data = (domain_ref['id'], None, None, None)
elif 'OS-TRUST:trust' in self.auth['scope']:
if not CONF.trust.enabled:
raise exception.Forbidden('Trusts are disabled.')
trust_ref = self._lookup_trust(
self.auth['scope']['OS-TRUST:trust'])
# TODO(ayoung): when trusts support domains, fill in domain data
if trust_ref.get('project_id') is not None:
project_ref = self._lookup_project(
{'id': trust_ref['project_id']})
self._scope_data = (None, project_ref['id'], trust_ref, None)
else:
self._scope_data = (None, None, trust_ref, None)
def _validate_auth_methods(self):
# make sure all the method data/payload are provided
for method_name in self.get_method_names():
if method_name not in self.auth['identity']:
raise exception.ValidationError(attribute=method_name,
target='identity')
# make sure auth method is supported
for method_name in self.get_method_names():
if method_name not in core.AUTH_METHODS:
raise exception.AuthMethodNotSupported()
def _validate_and_normalize_auth_data(self, scope_only=False):
"""Make sure "auth" is valid.
:param scope_only: If it is True, auth methods will not be
validated but only the scope data.
:type scope_only: boolean
"""
# make sure "auth" exist
if not self.auth:
raise exception.ValidationError(attribute='auth',
target='request body')
# NOTE(chioleong): Tokenless auth does not provide auth methods,
# we only care about using this method to validate the scope
# information. Therefore, validating the auth methods here is
# insignificant and we can skip it when scope_only is set to
# true.
if scope_only is False:
self._validate_auth_methods()
self._validate_and_normalize_scope_data()
def get_method_names(self):
"""Return the identity method names.
:returns: list of auth method names
"""
# Sanitizes methods received in request's body
# Filters out duplicates, while keeping elements' order.
method_names = []
for method in self.auth['identity']['methods']:
if method not in method_names:
method_names.append(method)
return method_names
def get_method_data(self, method):
"""Get the auth method payload.
:returns: auth method payload
"""
if method not in self.auth['identity']['methods']:
raise exception.ValidationError(attribute=method,
target='identity')
return self.auth['identity'][method]
def get_scope(self):
"""Get scope information.
Verify and return the scoping information.
:returns: (domain_id, project_id, trust_ref, unscoped).
If scope to a project, (None, project_id, None, None)
will be returned.
If scoped to a domain, (domain_id, None, None, None)
will be returned.
If scoped to a trust, (None, project_id, trust_ref, None),
Will be returned, where the project_id comes from the
trust definition.
If unscoped, (None, None, None, 'unscoped') will be
returned.
"""
return self._scope_data
def set_scope(self, domain_id=None, project_id=None, trust=None,
unscoped=None):
"""Set scope information."""
if domain_id and project_id:
msg = _('Scoping to both domain and project is not allowed')
raise ValueError(msg)
if domain_id and trust:
msg = _('Scoping to both domain and trust is not allowed')
raise ValueError(msg)
if project_id and trust:
msg = _('Scoping to both project and trust is not allowed')
raise ValueError(msg)
self._scope_data = (domain_id, project_id, trust, unscoped)
def validate_issue_token_auth(auth=None):
if auth is None:
return
@ -390,10 +110,10 @@ class Auth(controller.V3Controller):
validate_issue_token_auth(auth)
try:
auth_info = AuthInfo.create(auth=auth)
auth_context = AuthContext(extras={},
method_names=[],
bind={})
auth_info = core.AuthInfo.create(auth=auth)
auth_context = core.AuthContext(extras={},
method_names=[],
bind={})
self.authenticate(request, auth_info, auth_context)
if auth_context.get('access_token_id'):
auth_info.set_scope(None, auth_context['project_id'], None)
@ -505,7 +225,7 @@ class Auth(controller.V3Controller):
# `auth_context` is an instance of AuthContext is extra insurance and
# will prevent regressions.
if not isinstance(auth_context, AuthContext):
if not isinstance(auth_context, core.AuthContext):
LOG.error(
_LE('`auth_context` passed to the Auth controller '
'`authenticate` method is not of type '

View File

@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import sys
from oslo_log import log
from oslo_log import versionutils
from oslo_utils import importutils
@ -17,6 +19,7 @@ import six
import stevedore
from keystone.common import dependency
from keystone.common import utils
import keystone.conf
from keystone import exception
from keystone.i18n import _, _LI, _LE
@ -77,6 +80,279 @@ def get_auth_method(method_name):
return AUTH_METHODS[method_name]
class AuthContext(dict):
"""Retrofitting auth_context to reconcile identity attributes.
The identity attributes must not have conflicting values among the
auth plug-ins. The only exception is `expires_at`, which is set to its
earliest value.
"""
# identity attributes need to be reconciled among the auth plugins
IDENTITY_ATTRIBUTES = frozenset(['user_id', 'project_id',
'access_token_id', 'domain_id',
'expires_at'])
def __setitem__(self, key, val):
"""Override __setitem__ to prevent conflicting values."""
if key in self.IDENTITY_ATTRIBUTES and key in self:
existing_val = self[key]
if key == 'expires_at':
# special treatment for 'expires_at', we are going to take
# the earliest expiration instead.
if existing_val != val:
LOG.info(_LI('"expires_at" has conflicting values '
'%(existing)s and %(new)s. Will use the '
'earliest value.'),
{'existing': existing_val, 'new': val})
if existing_val is None or val is None:
val = existing_val or val
else:
val = min(existing_val, val)
elif existing_val != val:
msg = _('Unable to reconcile identity attribute %(attribute)s '
'as it has conflicting values %(new)s and %(old)s') % (
{'attribute': key,
'new': val,
'old': existing_val})
raise exception.Unauthorized(msg)
return super(AuthContext, self).__setitem__(key, val)
def update(self, E=None, **F):
"""Override update to prevent conflicting values."""
# NOTE(notmorgan): This will not be nearly as performant as the
# use of the built-in "update" method on the dict, however, the
# volume of data being changed here is very minimal in most cases
# and should not see a significant impact by iterating instead of
# explicit setting of values.
update_dicts = (E or {}, F or {})
for d in update_dicts:
for key, val in d.items():
self[key] = val
@dependency.requires('resource_api', 'trust_api')
class AuthInfo(object):
"""Encapsulation of "auth" request."""
@staticmethod
def create(auth=None, scope_only=False):
auth_info = AuthInfo(auth=auth)
auth_info._validate_and_normalize_auth_data(scope_only)
return auth_info
def __init__(self, auth=None):
self.auth = auth
self._scope_data = (None, None, None, None)
# self._scope_data is (domain_id, project_id, trust_ref, unscoped)
# project scope: (None, project_id, None, None)
# domain scope: (domain_id, None, None, None)
# trust scope: (None, None, trust_ref, None)
# unscoped: (None, None, None, 'unscoped')
def _assert_project_is_enabled(self, project_ref):
# ensure the project is enabled
try:
self.resource_api.assert_project_enabled(
project_id=project_ref['id'],
project=project_ref)
except AssertionError as e:
LOG.warning(six.text_type(e))
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
def _assert_domain_is_enabled(self, domain_ref):
try:
self.resource_api.assert_domain_enabled(
domain_id=domain_ref['id'],
domain=domain_ref)
except AssertionError as e:
LOG.warning(six.text_type(e))
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
def _lookup_domain(self, domain_info):
domain_id = domain_info.get('id')
domain_name = domain_info.get('name')
try:
if domain_name:
if (CONF.resource.domain_name_url_safe == 'strict' and
utils.is_not_url_safe(domain_name)):
msg = _('Domain name cannot contain reserved characters.')
LOG.warning(msg)
raise exception.Unauthorized(message=msg)
domain_ref = self.resource_api.get_domain_by_name(
domain_name)
else:
domain_ref = self.resource_api.get_domain(domain_id)
except exception.DomainNotFound as e:
LOG.warning(six.text_type(e))
raise exception.Unauthorized(e)
self._assert_domain_is_enabled(domain_ref)
return domain_ref
def _lookup_project(self, project_info):
project_id = project_info.get('id')
project_name = project_info.get('name')
try:
if project_name:
if (CONF.resource.project_name_url_safe == 'strict' and
utils.is_not_url_safe(project_name)):
msg = _('Project name cannot contain reserved characters.')
LOG.warning(msg)
raise exception.Unauthorized(message=msg)
if 'domain' not in project_info:
raise exception.ValidationError(attribute='domain',
target='project')
domain_ref = self._lookup_domain(project_info['domain'])
project_ref = self.resource_api.get_project_by_name(
project_name, domain_ref['id'])
else:
project_ref = self.resource_api.get_project(project_id)
# NOTE(morganfainberg): The _lookup_domain method will raise
# exception.Unauthorized if the domain isn't found or is
# disabled.
self._lookup_domain({'id': project_ref['domain_id']})
except exception.ProjectNotFound as e:
LOG.warning(six.text_type(e))
raise exception.Unauthorized(e)
self._assert_project_is_enabled(project_ref)
return project_ref
def _lookup_trust(self, trust_info):
trust_id = trust_info.get('id')
if not trust_id:
raise exception.ValidationError(attribute='trust_id',
target='trust')
trust = self.trust_api.get_trust(trust_id)
return trust
def _validate_and_normalize_scope_data(self):
"""Validate and normalize scope data."""
if 'scope' not in self.auth:
return
if sum(['project' in self.auth['scope'],
'domain' in self.auth['scope'],
'unscoped' in self.auth['scope'],
'OS-TRUST:trust' in self.auth['scope']]) != 1:
raise exception.ValidationError(
attribute='project, domain, OS-TRUST:trust or unscoped',
target='scope')
if 'unscoped' in self.auth['scope']:
self._scope_data = (None, None, None, 'unscoped')
return
if 'project' in self.auth['scope']:
project_ref = self._lookup_project(self.auth['scope']['project'])
self._scope_data = (None, project_ref['id'], None, None)
elif 'domain' in self.auth['scope']:
domain_ref = self._lookup_domain(self.auth['scope']['domain'])
self._scope_data = (domain_ref['id'], None, None, None)
elif 'OS-TRUST:trust' in self.auth['scope']:
if not CONF.trust.enabled:
raise exception.Forbidden('Trusts are disabled.')
trust_ref = self._lookup_trust(
self.auth['scope']['OS-TRUST:trust'])
# TODO(ayoung): when trusts support domains, fill in domain data
if trust_ref.get('project_id') is not None:
project_ref = self._lookup_project(
{'id': trust_ref['project_id']})
self._scope_data = (None, project_ref['id'], trust_ref, None)
else:
self._scope_data = (None, None, trust_ref, None)
def _validate_auth_methods(self):
# make sure all the method data/payload are provided
for method_name in self.get_method_names():
if method_name not in self.auth['identity']:
raise exception.ValidationError(attribute=method_name,
target='identity')
# make sure auth method is supported
for method_name in self.get_method_names():
if method_name not in AUTH_METHODS:
raise exception.AuthMethodNotSupported()
def _validate_and_normalize_auth_data(self, scope_only=False):
"""Make sure "auth" is valid.
:param scope_only: If it is True, auth methods will not be
validated but only the scope data.
:type scope_only: boolean
"""
# make sure "auth" exist
if not self.auth:
raise exception.ValidationError(attribute='auth',
target='request body')
# NOTE(chioleong): Tokenless auth does not provide auth methods,
# we only care about using this method to validate the scope
# information. Therefore, validating the auth methods here is
# insignificant and we can skip it when scope_only is set to
# true.
if scope_only is False:
self._validate_auth_methods()
self._validate_and_normalize_scope_data()
def get_method_names(self):
"""Return the identity method names.
:returns: list of auth method names
"""
# Sanitizes methods received in request's body
# Filters out duplicates, while keeping elements' order.
method_names = []
for method in self.auth['identity']['methods']:
if method not in method_names:
method_names.append(method)
return method_names
def get_method_data(self, method):
"""Get the auth method payload.
:returns: auth method payload
"""
if method not in self.auth['identity']['methods']:
raise exception.ValidationError(attribute=method,
target='identity')
return self.auth['identity'][method]
def get_scope(self):
"""Get scope information.
Verify and return the scoping information.
:returns: (domain_id, project_id, trust_ref, unscoped).
If scope to a project, (None, project_id, None, None)
will be returned.
If scoped to a domain, (domain_id, None, None, None)
will be returned.
If scoped to a trust, (None, project_id, trust_ref, None),
Will be returned, where the project_id comes from the
trust definition.
If unscoped, (None, None, None, 'unscoped') will be
returned.
"""
return self._scope_data
def set_scope(self, domain_id=None, project_id=None, trust=None,
unscoped=None):
"""Set scope information."""
if domain_id and project_id:
msg = _('Scoping to both domain and project is not allowed')
raise ValueError(msg)
if domain_id and trust:
msg = _('Scoping to both domain and trust is not allowed')
raise ValueError(msg)
if project_id and trust:
msg = _('Scoping to both project and trust is not allowed')
raise ValueError(msg)
self._scope_data = (domain_id, project_id, trust, unscoped)
@dependency.requires('identity_api')
class UserMFARulesValidator(object):
"""Helper object that can validate the MFA Rules."""

View File

@ -17,7 +17,7 @@ import hashlib
from oslo_log import log
from keystone.auth import controllers
from keystone.auth import core
from keystone.common import dependency
import keystone.conf
from keystone import exception
@ -92,7 +92,7 @@ class TokenlessAuthHelper(object):
auth['scope'] = self._build_scope_info()
# NOTE(chioleong): We'll let AuthInfo validate the scope for us
auth_info = controllers.AuthInfo.create(auth, scope_only=True)
auth_info = core.AuthInfo.create(auth, scope_only=True)
return auth_info.get_scope()
def get_mapped_user(self, project_id=None, domain_id=None):

View File

@ -62,7 +62,7 @@ class TestAuthPlugin(unit.SQLDriverOverrides, unit.TestCase):
auth_data[method_name] = {'test': 'test'}
auth_data = {'identity': auth_data}
self.assertRaises(exception.AuthMethodNotSupported,
auth.controllers.AuthInfo.create,
auth.core.AuthInfo.create,
auth_data)
def test_addition_auth_steps(self):
@ -76,8 +76,8 @@ class TestAuthPlugin(unit.SQLDriverOverrides, unit.TestCase):
auth_data[METHOD_NAME] = {
'test': 'test'}
auth_data = {'identity': auth_data}
auth_info = auth.controllers.AuthInfo.create(auth_data)
auth_context = auth.controllers.AuthContext(extras={}, method_names=[])
auth_info = auth.core.AuthInfo.create(auth_data)
auth_context = auth.core.AuthContext(extras={}, method_names=[])
try:
self.api.authenticate(self.make_request(), auth_info, auth_context)
except exception.AdditionalAuthRequired as e:
@ -91,8 +91,8 @@ class TestAuthPlugin(unit.SQLDriverOverrides, unit.TestCase):
auth_data[METHOD_NAME] = {
'response': EXPECTED_RESPONSE}
auth_data = {'identity': auth_data}
auth_info = auth.controllers.AuthInfo.create(auth_data)
auth_context = auth.controllers.AuthContext(extras={}, method_names=[])
auth_info = auth.core.AuthInfo.create(auth_data)
auth_context = auth.core.AuthContext(extras={}, method_names=[])
self.api.authenticate(self.make_request(), auth_info, auth_context)
self.assertEqual(DEMO_USER_ID, auth_context['user_id'])
@ -101,8 +101,8 @@ class TestAuthPlugin(unit.SQLDriverOverrides, unit.TestCase):
auth_data[METHOD_NAME] = {
'response': uuid.uuid4().hex}
auth_data = {'identity': auth_data}
auth_info = auth.controllers.AuthInfo.create(auth_data)
auth_context = auth.controllers.AuthContext(extras={}, method_names=[])
auth_info = auth.core.AuthInfo.create(auth_data)
auth_context = auth.core.AuthContext(extras={}, method_names=[])
self.assertRaises(exception.Unauthorized,
self.api.authenticate,
self.make_request(),
@ -152,8 +152,8 @@ class TestMapped(unit.TestCase):
method_name: {'protocol': method_name},
}
}
auth_info = auth.controllers.AuthInfo.create(auth_data)
auth_context = auth.controllers.AuthContext(
auth_info = auth.core.AuthInfo.create(auth_data)
auth_context = auth.core.AuthContext(
extras={},
method_names=[],
user_id=uuid.uuid4().hex)
@ -171,7 +171,7 @@ class TestMapped(unit.TestCase):
auth_data[method_name] = {'protocol': method_name}
auth_data = {'identity': auth_data}
auth_context = auth.controllers.AuthContext(
auth_context = auth.core.AuthContext(
extras={},
method_names=[],
user_id=uuid.uuid4().hex)
@ -181,7 +181,7 @@ class TestMapped(unit.TestCase):
with mock.patch.object(auth.plugins.mapped.Mapped,
'authenticate',
return_value=None) as authenticate:
auth_info = auth.controllers.AuthInfo.create(auth_data)
auth_info = auth.core.AuthInfo.create(auth_data)
request = self.make_request(environ={'REMOTE_USER': 'foo@idp.com'})
self.api.authenticate(request, auth_info, auth_context)
# make sure Mapped plugin got invoked with the correct payload

View File

@ -1289,8 +1289,8 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
if not auth_data:
auth_data = self.build_authentication_request(
kerberos=kerberos)['auth']
auth_info = auth.controllers.AuthInfo.create(auth_data)
auth_context = auth.controllers.AuthContext(extras={}, method_names=[])
auth_info = auth.core.AuthInfo.create(auth_data)
auth_context = auth.core.AuthContext(extras={}, method_names=[])
return self.make_request(environ=environment), auth_info, auth_context

View File

@ -216,14 +216,14 @@ class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
auth_data['abc'] = {'test': 'test'}
auth_data = {'identity': auth_data}
self.assertRaises(exception.AuthMethodNotSupported,
auth.controllers.AuthInfo.create,
auth.core.AuthInfo.create,
auth_data)
def test_missing_auth_method_data(self):
auth_data = {'methods': ['password']}
auth_data = {'identity': auth_data}
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo.create,
auth.core.AuthInfo.create,
auth_data)
def test_project_name_no_domain(self):
@ -232,7 +232,7 @@ class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
password='test',
project_name='abc')['auth']
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo.create,
auth.core.AuthInfo.create,
auth_data)
def test_both_project_and_domain_in_scope(self):
@ -242,7 +242,7 @@ class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
project_name='test',
domain_name='test')['auth']
self.assertRaises(exception.ValidationError,
auth.controllers.AuthInfo.create,
auth.core.AuthInfo.create,
auth_data)
def test_get_method_names_duplicates(self):
@ -252,7 +252,7 @@ class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
password='test')['auth']
auth_data['identity']['methods'] = ['password', 'token',
'password', 'password']
auth_info = auth.controllers.AuthInfo.create(auth_data)
auth_info = auth.core.AuthInfo.create(auth_data)
self.assertEqual(['password', 'token'],
auth_info.get_method_names())
@ -260,7 +260,7 @@ class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
auth_data = self.build_authentication_request(
user_id='test',
password='test')['auth']
auth_info = auth.controllers.AuthInfo.create(auth_data)
auth_info = auth.core.AuthInfo.create(auth_data)
method_name = uuid.uuid4().hex
self.assertRaises(exception.ValidationError,
@ -2291,8 +2291,8 @@ class TokenAPITests(object):
auth_data['identity']['methods'] = ["password", "external"]
auth_data['identity']['external'] = {}
api = auth.controllers.Auth()
auth_info = auth.controllers.AuthInfo(auth_data)
auth_context = auth.controllers.AuthContext(extras={}, methods=[])
auth_info = auth.core.AuthInfo(auth_data)
auth_context = auth.core.AuthContext(extras={}, methods=[])
self.assertRaises(exception.Unauthorized,
api.authenticate,
self.make_request(),
@ -4948,7 +4948,7 @@ class TestTrustChain(test_v3.RestfulTestCase):
class TestAuthContext(unit.TestCase):
def setUp(self):
super(TestAuthContext, self).setUp()
self.auth_context = auth.controllers.AuthContext()
self.auth_context = auth.core.AuthContext()
def test_pick_lowest_expires_at(self):
expires_at_1 = utils.isotime(timeutils.utcnow())
@ -4960,7 +4960,7 @@ class TestAuthContext(unit.TestCase):
self.assertEqual(expires_at_1, self.auth_context['expires_at'])
def test_identity_attribute_conflict(self):
for identity_attr in auth.controllers.AuthContext.IDENTITY_ATTRIBUTES:
for identity_attr in auth.core.AuthContext.IDENTITY_ATTRIBUTES:
self.auth_context[identity_attr] = uuid.uuid4().hex
if identity_attr == 'expires_at':
# 'expires_at' is a special case. Will test it in a separate
@ -4973,7 +4973,7 @@ class TestAuthContext(unit.TestCase):
uuid.uuid4().hex)
def test_identity_attribute_conflict_with_none_value(self):
for identity_attr in auth.controllers.AuthContext.IDENTITY_ATTRIBUTES:
for identity_attr in auth.core.AuthContext.IDENTITY_ATTRIBUTES:
self.auth_context[identity_attr] = None
if identity_attr == 'expires_at':

View File

@ -146,7 +146,7 @@ class TestValidate(unit.TestCase):
federation_constants.IDENTITY_PROVIDER: identity_provider,
federation_constants.PROTOCOL: protocol,
}
auth_context = auth.controllers.AuthContext(**auth_context_params)
auth_context = auth.core.AuthContext(**auth_context_params)
token_id, token_data_ = self.token_provider_api.issue_token(
user_ref['id'], method_names, auth_context=auth_context)