Process and validate auth methods against MFA rules
Process and validate the auth methods used by a given auth request against the user's MFA ruleset(s) and ensure the required auth methods have been used. If insufficient auth methods are used an "InsufficientAuthMethods" exception is raised (401) indicating the reason for failure. Change-Id: I2a83aa164f1c17352807188bfbaae17d909b3e5f bp: per-user-auth-plugin-reqs
This commit is contained in:
parent
2e7c7c955e
commit
b17c3a5032
|
@ -12,4 +12,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# NOTE(notmorgan): Be careful in adjusting whitespace here, flake8 checks
|
||||
# get cranky.
|
||||
from keystone.auth import core # noqa
|
||||
|
||||
from keystone.auth import controllers # noqa
|
||||
|
|
|
@ -16,12 +16,10 @@ import sys
|
|||
|
||||
from keystoneclient.common import cms
|
||||
from oslo_log import log
|
||||
from oslo_log import versionutils
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import importutils
|
||||
import six
|
||||
import stevedore
|
||||
|
||||
from keystone.auth import core
|
||||
from keystone.auth import schema
|
||||
from keystone.common import controller
|
||||
from keystone.common import dependency
|
||||
|
@ -39,54 +37,15 @@ LOG = log.getLogger(__name__)
|
|||
|
||||
CONF = keystone.conf.CONF
|
||||
|
||||
# registry of authentication methods
|
||||
AUTH_METHODS = {}
|
||||
AUTH_PLUGINS_LOADED = False
|
||||
# TODO(notmorgan): Update all references to the following functions to
|
||||
# reference auth.core instead of auth.controllers
|
||||
get_auth_method = core.get_auth_method
|
||||
load_auth_method = core.load_auth_method
|
||||
load_auth_methods = core.load_auth_methods
|
||||
|
||||
|
||||
def load_auth_method(method):
|
||||
plugin_name = CONF.auth.get(method) or 'default'
|
||||
namespace = 'keystone.auth.%s' % method
|
||||
try:
|
||||
driver_manager = stevedore.DriverManager(namespace, plugin_name,
|
||||
invoke_on_load=True)
|
||||
return driver_manager.driver
|
||||
except RuntimeError:
|
||||
LOG.debug('Failed to load the %s driver (%s) using stevedore, will '
|
||||
'attempt to load using import_object instead.',
|
||||
method, plugin_name)
|
||||
|
||||
driver = importutils.import_object(plugin_name)
|
||||
|
||||
msg = (_(
|
||||
'Direct import of auth plugin %(name)r is deprecated as of Liberty in '
|
||||
'favor of its entrypoint from %(namespace)r and may be removed in '
|
||||
'N.') %
|
||||
{'name': plugin_name, 'namespace': namespace})
|
||||
versionutils.report_deprecated_feature(LOG, msg)
|
||||
|
||||
return driver
|
||||
|
||||
|
||||
def load_auth_methods():
|
||||
global AUTH_PLUGINS_LOADED
|
||||
|
||||
if AUTH_PLUGINS_LOADED:
|
||||
# Only try and load methods a single time.
|
||||
return
|
||||
# config.setup_authentication should be idempotent, call it to ensure we
|
||||
# have setup all the appropriate configuration options we may need.
|
||||
keystone.conf.auth.setup_authentication()
|
||||
for plugin in set(CONF.auth.methods):
|
||||
AUTH_METHODS[plugin] = load_auth_method(plugin)
|
||||
AUTH_PLUGINS_LOADED = True
|
||||
|
||||
|
||||
def get_auth_method(method_name):
|
||||
global AUTH_METHODS
|
||||
if method_name not in AUTH_METHODS:
|
||||
raise exception.AuthMethodNotSupported()
|
||||
return AUTH_METHODS[method_name]
|
||||
# TODO(notmorgan): Move Common Auth Code (AuthContext and AuthInfo)
|
||||
# loading into keystone.auth.core (and update all references to the new
|
||||
# locations)
|
||||
|
||||
|
||||
class AuthContext(dict):
|
||||
|
@ -279,7 +238,7 @@ class AuthInfo(object):
|
|||
|
||||
# make sure auth method is supported
|
||||
for method_name in self.get_method_names():
|
||||
if method_name not in AUTH_METHODS:
|
||||
if method_name not in core.AUTH_METHODS:
|
||||
raise exception.AuthMethodNotSupported()
|
||||
|
||||
def _validate_and_normalize_auth_data(self, scope_only=False):
|
||||
|
@ -427,6 +386,7 @@ class Auth(controller.V3Controller):
|
|||
def __init__(self, *args, **kw):
|
||||
super(Auth, self).__init__(*args, **kw)
|
||||
keystone.conf.auth.setup_authentication()
|
||||
self._mfa_rules_validator = core.UserMFARulesValidator()
|
||||
|
||||
def authenticate_for_token(self, request, auth=None):
|
||||
"""Authenticate user and issue a token."""
|
||||
|
@ -447,10 +407,18 @@ class Auth(controller.V3Controller):
|
|||
|
||||
# NOTE(notmorgan): only methods that actually run and succeed will
|
||||
# be in the auth_context['method_names'] list. Do not blindly take
|
||||
# the values from auth_info, look at the authoritative values.
|
||||
method_names = auth_context.get('method_names', [])
|
||||
# make sure the list is unique
|
||||
method_names = list(set(method_names))
|
||||
# the values from auth_info, look at the authoritative values. Make
|
||||
# sure the set is unique.
|
||||
method_names_set = set(auth_context.get('method_names', []))
|
||||
method_names = list(method_names_set)
|
||||
|
||||
# Do MFA Rule Validation for the user
|
||||
if not self._mfa_rules_validator.check_auth_methods_against_rules(
|
||||
auth_context['user_id'], method_names_set):
|
||||
raise exception.InsufficientAuthMethods(
|
||||
user_id=auth_context['user_id'],
|
||||
methods='[%s]' % ','.join(auth_info.get_method_names()))
|
||||
|
||||
expires_at = auth_context.get('expires_at')
|
||||
token_audit_id = auth_context.get('audit_id')
|
||||
|
||||
|
|
|
@ -0,0 +1,196 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_log import versionutils
|
||||
from oslo_utils import importutils
|
||||
import six
|
||||
import stevedore
|
||||
|
||||
from keystone.common import dependency
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
from keystone.i18n import _, _LI, _LE
|
||||
from keystone.identity.backends import resource_options as ro
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
CONF = keystone.conf.CONF
|
||||
|
||||
# registry of authentication methods
|
||||
AUTH_METHODS = {}
|
||||
AUTH_PLUGINS_LOADED = False
|
||||
|
||||
|
||||
def load_auth_method(method):
|
||||
plugin_name = CONF.auth.get(method) or 'default'
|
||||
namespace = 'keystone.auth.%s' % method
|
||||
try:
|
||||
driver_manager = stevedore.DriverManager(namespace, plugin_name,
|
||||
invoke_on_load=True)
|
||||
return driver_manager.driver
|
||||
except RuntimeError:
|
||||
LOG.debug('Failed to load the %s driver (%s) using stevedore, will '
|
||||
'attempt to load using import_object instead.',
|
||||
method, plugin_name)
|
||||
|
||||
driver = importutils.import_object(plugin_name)
|
||||
|
||||
msg = (_(
|
||||
'Direct import of auth plugin %(name)r is deprecated as of Liberty in '
|
||||
'favor of its entrypoint from %(namespace)r and may be removed in '
|
||||
'N.') %
|
||||
{'name': plugin_name, 'namespace': namespace})
|
||||
versionutils.report_deprecated_feature(LOG, msg)
|
||||
|
||||
return driver
|
||||
|
||||
|
||||
def load_auth_methods():
|
||||
global AUTH_PLUGINS_LOADED
|
||||
|
||||
if AUTH_PLUGINS_LOADED:
|
||||
# Only try and load methods a single time.
|
||||
return
|
||||
# config.setup_authentication should be idempotent, call it to ensure we
|
||||
# have setup all the appropriate configuration options we may need.
|
||||
keystone.conf.auth.setup_authentication()
|
||||
for plugin in set(CONF.auth.methods):
|
||||
AUTH_METHODS[plugin] = load_auth_method(plugin)
|
||||
AUTH_PLUGINS_LOADED = True
|
||||
|
||||
|
||||
def get_auth_method(method_name):
|
||||
global AUTH_METHODS
|
||||
if method_name not in AUTH_METHODS:
|
||||
raise exception.AuthMethodNotSupported()
|
||||
return AUTH_METHODS[method_name]
|
||||
|
||||
|
||||
@dependency.requires('identity_api')
|
||||
class UserMFARulesValidator(object):
|
||||
"""Helper object that can validate the MFA Rules."""
|
||||
|
||||
@property
|
||||
def _auth_methods(self):
|
||||
if AUTH_PLUGINS_LOADED:
|
||||
return set(AUTH_METHODS.keys())
|
||||
raise RuntimeError(_('Auth Method Plugins are not loaded.'))
|
||||
|
||||
def check_auth_methods_against_rules(self, user_id, auth_methods):
|
||||
"""Validate the MFA rules against the successful auth methods.
|
||||
|
||||
:param user_id: The user's ID (uuid).
|
||||
:type user_id: str
|
||||
:param auth_methods: List of methods that were used for auth
|
||||
:type auth_methods: set
|
||||
:returns: Boolean, ``True`` means rules match and auth may proceed,
|
||||
``False`` means rules do not match.
|
||||
"""
|
||||
user_ref = self.identity_api.get_user(user_id)
|
||||
mfa_rules = user_ref['options'].get(ro.MFA_RULES_OPT.option_name, [])
|
||||
mfa_rules_enabled = user_ref['options'].get(
|
||||
ro.MFA_ENABLED_OPT.option_name, True)
|
||||
rules = self._parse_rule_structure(mfa_rules, user_ref['id'])
|
||||
|
||||
if not rules or not mfa_rules_enabled:
|
||||
# return quickly if the rules are disabled for the user or not set
|
||||
LOG.debug('MFA Rules not processed for user `%(user_id)s`. '
|
||||
'Rule list: `%(rules)s` (Enabled: `%(enabled)s`).',
|
||||
{'user_id': user_id,
|
||||
'rules': mfa_rules,
|
||||
'enabled': mfa_rules_enabled})
|
||||
return True
|
||||
|
||||
for r in rules:
|
||||
# NOTE(notmorgan): We only check against the actually loaded
|
||||
# auth methods meaning that the keystone administrator may
|
||||
# disable an auth method, and a rule will still pass making it
|
||||
# impossible to accidently lock-out a subset of users with a
|
||||
# bad keystone.conf
|
||||
r_set = set(r).intersection(self._auth_methods)
|
||||
if set(auth_methods).issuperset(r_set):
|
||||
# Rule Matches no need to continue, return here.
|
||||
LOG.debug('Auth methods for user `%(user_id)s`, `%(methods)s` '
|
||||
'matched MFA rule `%(rule)s`. Loaded '
|
||||
'auth_methods: `%(loaded)s`',
|
||||
{'user_id': user_id,
|
||||
'rule': list(r_set),
|
||||
'methods': auth_methods,
|
||||
'loaded': self._auth_methods})
|
||||
return True
|
||||
|
||||
LOG.debug('Auth methods for user `%(user_id)s`, `%(methods)s` did not '
|
||||
'match a MFA rule in `%(rules)s`.',
|
||||
{'user_id': user_id,
|
||||
'methods': auth_methods,
|
||||
'rules': rules})
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _parse_rule_structure(rules, user_id):
|
||||
"""Validate and parse the rule data structure.
|
||||
|
||||
Rule sets must be in the form of list of lists. The lists may not
|
||||
have duplicates and must not be empty. The top-level list may be empty
|
||||
indicating that no rules exist.
|
||||
|
||||
:param rules: The list of rules from the user_ref
|
||||
:type rules: list
|
||||
:param user_id: the user_id, used for logging purposes
|
||||
:type user_id: str
|
||||
:returns: list of list, duplicates are stripped
|
||||
"""
|
||||
# NOTE(notmorgan): Most of this is done at the API request validation
|
||||
# and in the storage layer, it makes sense to also validate here and
|
||||
# ensure the data returned from the DB is sane, This will not raise
|
||||
# any exceptions, but just produce a usable set of data for rules
|
||||
# processing.
|
||||
rule_set = []
|
||||
found_rules = set()
|
||||
if not isinstance(rules, list):
|
||||
LOG.error(_LE('Corrupt rule data structure for user %(user_id)s, '
|
||||
'no rules loaded.'),
|
||||
{'user_id': user_id})
|
||||
return rule_set
|
||||
elif not rules:
|
||||
return rule_set
|
||||
|
||||
for r_list in rules:
|
||||
if not isinstance(r_list, list):
|
||||
LOG.info(_LI('Ignoring Rule %(rule)r; rule must be a list of '
|
||||
'strings.'),
|
||||
{'type': type(r_list)})
|
||||
continue
|
||||
|
||||
if r_list:
|
||||
# No empty rules are allowed.
|
||||
_ok_rule = True
|
||||
for item in r_list:
|
||||
if not isinstance(item, six.string_types):
|
||||
# Rules may only contain strings for method names
|
||||
# Reject a rule with non-string values
|
||||
LOG.info(_LI('Ignoring Rule %(rule)r; rule contains '
|
||||
'non-string values.'),
|
||||
{'rule': r_list})
|
||||
_ok_rule = False
|
||||
break
|
||||
if _ok_rule:
|
||||
# De-dupe rule and add to the return value
|
||||
rule_string = ';'.join(sorted(r_list))
|
||||
if rule_string not in found_rules:
|
||||
found_rules.add(rule_string)
|
||||
r_list = list(set(r_list))
|
||||
rule_set.append(r_list)
|
||||
|
||||
return rule_set
|
|
@ -253,6 +253,15 @@ class Unauthorized(SecurityError):
|
|||
title = 'Unauthorized'
|
||||
|
||||
|
||||
class InsufficientAuthMethods(Error):
|
||||
# NOTE(notmorgan): This is not a security error, this is meant to
|
||||
# communicate real information back to the user.
|
||||
message_format = _("Insufficient auth methods received for %(user_id)s. "
|
||||
"Auth Methods Provided: %(methods)s.")
|
||||
code = 401
|
||||
title = 'Unauthorized'
|
||||
|
||||
|
||||
class PasswordExpired(Unauthorized):
|
||||
message_format = _("The password is expired and needs to be changed for "
|
||||
"user: %(user_id)s.")
|
||||
|
|
|
@ -30,22 +30,22 @@ def _mfa_rules_validator_list_of_lists_of_strings_no_duplicates(value):
|
|||
'duplicated.')
|
||||
if not isinstance(value, list):
|
||||
raise TypeError(msg)
|
||||
sublist_set = set()
|
||||
sublists = []
|
||||
for item in value:
|
||||
string_set = set()
|
||||
if not isinstance(item, list):
|
||||
raise TypeError(msg)
|
||||
if not item:
|
||||
raise ValueError(msg)
|
||||
if item in sublist_set:
|
||||
if item in sublists:
|
||||
raise ValueError(msg)
|
||||
sublist_set.add(sublist_set)
|
||||
sublists.append(sublists)
|
||||
for element in item:
|
||||
if not isinstance(element, six.string_types):
|
||||
raise TypeError(msg)
|
||||
if element in string_set:
|
||||
raise ValueError(msg)
|
||||
sublist_set.add(element)
|
||||
string_set.add(element)
|
||||
|
||||
|
||||
USER_OPTIONS_REGISTRY = resource_options.ResourceOptionRegistry('USER')
|
||||
|
|
|
@ -48,19 +48,19 @@ class LoadAuthPlugins(fixtures.Fixture):
|
|||
def setUp(self):
|
||||
super(LoadAuthPlugins, self).setUp()
|
||||
|
||||
AUTH_METHODS = auth.controllers.AUTH_METHODS
|
||||
AUTH_METHODS = auth.core.AUTH_METHODS
|
||||
for method_name in self.method_names:
|
||||
if method_name in AUTH_METHODS:
|
||||
self.saved[method_name] = AUTH_METHODS[method_name]
|
||||
AUTH_METHODS[method_name] = auth.controllers.load_auth_method(
|
||||
AUTH_METHODS[method_name] = auth.core.load_auth_method(
|
||||
method_name)
|
||||
auth.controllers.AUTH_PLUGINS_LOADED = True
|
||||
auth.core.AUTH_PLUGINS_LOADED = True
|
||||
|
||||
def cleanUp(self):
|
||||
AUTH_METHODS = auth.controllers.AUTH_METHODS
|
||||
AUTH_METHODS = auth.core.AUTH_METHODS
|
||||
for method_name in list(AUTH_METHODS):
|
||||
if method_name in self.saved:
|
||||
AUTH_METHODS[method_name] = self.saved[method_name]
|
||||
else:
|
||||
del AUTH_METHODS[method_name]
|
||||
auth.controllers.AUTH_PLUGINS_LOADED = False
|
||||
auth.core.AUTH_PLUGINS_LOADED = False
|
||||
|
|
|
@ -51,5 +51,5 @@ class BackendLoader(fixtures.Fixture):
|
|||
del self._testcase # break circular reference
|
||||
|
||||
def clear_auth_plugin_registry(self):
|
||||
auth.controllers.AUTH_METHODS.clear()
|
||||
auth.controllers.AUTH_PLUGINS_LOADED = False
|
||||
auth.core.AUTH_METHODS.clear()
|
||||
auth.core.AUTH_PLUGINS_LOADED = False
|
||||
|
|
|
@ -115,7 +115,7 @@ class TestAuthPlugin(unit.SQLDriverOverrides, unit.TestCase):
|
|||
auth_plugins.ConfigAuthPlugins(self.config_fixture,
|
||||
['external', 'external']))
|
||||
auth.controllers.load_auth_methods()
|
||||
self.assertIn('external', auth.controllers.AUTH_METHODS)
|
||||
self.assertIn('external', auth.core.AUTH_METHODS)
|
||||
|
||||
|
||||
class TestAuthPluginDynamicOptions(TestAuthPlugin):
|
||||
|
|
|
@ -37,6 +37,7 @@ from keystone.common import utils
|
|||
import keystone.conf
|
||||
from keystone.credential.providers import fernet as credential_fernet
|
||||
from keystone import exception
|
||||
from keystone.identity.backends import resource_options as ro
|
||||
from keystone.policy.backends import rules
|
||||
from keystone.tests.common import auth as common_auth
|
||||
from keystone.tests import unit
|
||||
|
@ -47,6 +48,164 @@ from keystone.tests.unit import test_v3
|
|||
CONF = keystone.conf.CONF
|
||||
|
||||
|
||||
class TestMFARules(test_v3.RestfulTestCase, testcase.TestCase):
|
||||
def setUp(self):
|
||||
super(TestMFARules, self).setUp()
|
||||
auth.core.load_auth_methods()
|
||||
self.controller = auth.controllers.Auth()
|
||||
self.addCleanup(self.cleanup)
|
||||
|
||||
def cleanup(self):
|
||||
totp_creds = self.credential_api.list_credentials_for_user(
|
||||
self.user['id'], type='totp')
|
||||
|
||||
for cred in totp_creds:
|
||||
self.credential_api.delete_credential(cred['id'])
|
||||
|
||||
def auth_plugin_config_override(self, methods=None, **method_classes):
|
||||
methods = ['totp', 'token', 'password']
|
||||
super(TestMFARules, self).auth_plugin_config_override(methods)
|
||||
|
||||
def _update_user_with_MFA_rules(self, rule_list, rules_enabled=True):
|
||||
user = self.user.copy()
|
||||
# Do not update password
|
||||
user.pop('password')
|
||||
user['options'][ro.MFA_RULES_OPT.option_name] = rule_list
|
||||
user['options'][ro.MFA_ENABLED_OPT.option_name] = rules_enabled
|
||||
self.identity_api.update_user(user['id'], user)
|
||||
|
||||
def test_MFA_single_method_rules_requirements_met_succeeds(self):
|
||||
# ensure that a simple password works if a password-only rules exists
|
||||
rule_list = [['password'], ['password', 'totp']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id))
|
||||
|
||||
def test_MFA_multi_method_rules_requirements_met_succeeds(self):
|
||||
# validate that multiple auth-methods function if all are specified
|
||||
# and the rules requires it
|
||||
rule_list = [['password', 'totp']]
|
||||
totp_cred = unit.new_totp_credential(self.user_id, self.project_id)
|
||||
self.credential_api.create_credential(uuid.uuid4().hex, totp_cred)
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
auth_req = self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
passcode=totp._generate_totp_passcode(totp_cred['blob']))
|
||||
self.v3_create_token(auth_req)
|
||||
|
||||
def test_MFA_single_method_rules_requirements_not_met_fails(self):
|
||||
# if a rule matching a single auth type is specified and is not matched
|
||||
# the result should be unauthorized
|
||||
rule_list = [['totp']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id),
|
||||
expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
def test_MFA_multi_method_rules_requirements_not_met_fails(self):
|
||||
# if multiple rules are specified and only one is passed,
|
||||
# unauthorized is expected
|
||||
rule_list = [['password', 'totp']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id),
|
||||
expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
def test_MFA_rules_bogus_non_existing_auth_method_succeeds(self):
|
||||
# Bogus auth methods are thrown out from rules.
|
||||
rule_list = [['password'], ['BoGusAuThMeTh0dHandl3r']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id))
|
||||
|
||||
def test_MFA_rules_disabled_MFA_succeeeds(self):
|
||||
# ensure that if MFA is "disableD" authentication succeeds, even if
|
||||
# not enough auth methods are specified
|
||||
rule_list = [['password', 'totp']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list,
|
||||
rules_enabled=False)
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id))
|
||||
|
||||
def test_MFA_rules_all_bogus_rules_results_in_default_behavior(self):
|
||||
# if all the rules are bogus, the result is the same as the default
|
||||
# behavior, any single password method is sufficient
|
||||
rule_list = [[uuid.uuid4().hex, uuid.uuid4().hex],
|
||||
['BoGus'],
|
||||
['NonExistantMethod']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id))
|
||||
|
||||
|
||||
class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
|
||||
def setUp(self):
|
||||
super(TestAuthInfo, self).setUp()
|
||||
|
|
Loading…
Reference in New Issue