Merge "PCI-DSS Lockout requirements"

This commit is contained in:
Jenkins 2016-08-05 06:28:09 +00:00 committed by Gerrit Code Review
commit 910e6e0a12
8 changed files with 225 additions and 9 deletions

View File

@ -0,0 +1,26 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sql
def upgrade(migrate_engine):
meta = sql.MetaData()
meta.bind = migrate_engine
failed_auth_count = sql.Column('failed_auth_count', sql.Integer,
nullable=True)
failed_auth_at = sql.Column('failed_auth_at', sql.DateTime(),
nullable=True)
local_user_table = sql.Table('local_user', meta, autoload=True)
local_user_table.create_column(failed_auth_count)
local_user_table.create_column(failed_auth_at)

View File

@ -30,14 +30,16 @@ may not match the value of the user's `enabled` column in the user table.
lockout_failure_attempts = cfg.IntOpt( lockout_failure_attempts = cfg.IntOpt(
'lockout_failure_attempts', 'lockout_failure_attempts',
default=0, default=None,
min=0, min=1,
help=utils.fmt(""" help=utils.fmt("""
The maximum number of times that a user can fail to authenticate before the The maximum number of times that a user can fail to authenticate before the
user account is locked for the number of seconds specified by user account is locked for the number of seconds specified by
`[security_compliance] lockout_duration`. Setting this value to zero (the `[security_compliance] lockout_duration`. This feature is disabled by
default) disables this feature. This feature depends on the `sql` backend for default. If this feature is enabled and `[security_compliance]
the `[identity] driver`. lockout_duration` is not set, then users may be locked out indefinitely
until the user is explicitly enabled via the API. This feature depends on
the `sql` backend for the `[identity] driver`.
""")) """))
lockout_duration = cfg.IntOpt( lockout_duration = cfg.IntOpt(

View File

@ -249,6 +249,10 @@ class UserDisabled(Unauthorized):
message_format = _("The account is disabled for user: %(user_id)s") message_format = _("The account is disabled for user: %(user_id)s")
class AccountLocked(Unauthorized):
message_format = _("The account is locked for user: %(user_id)s")
class AuthMethodNotSupported(AuthPluginException): class AuthMethodNotSupported(AuthPluginException):
message_format = _("Attempted to authenticate with an unsupported method.") message_format = _("Attempted to authenticate with an unsupported method.")

View File

@ -14,6 +14,7 @@
import datetime import datetime
from oslo_log import log
import sqlalchemy import sqlalchemy
from keystone.common import driver_hints from keystone.common import driver_hints
@ -27,6 +28,7 @@ from keystone.identity.backends import sql_model as model
CONF = keystone.conf.CONF CONF = keystone.conf.CONF
LOG = log.getLogger(__name__)
class Identity(base.IdentityDriverV8): class Identity(base.IdentityDriverV8):
@ -58,12 +60,58 @@ class Identity(base.IdentityDriverV8):
user_ref = self._get_user(session, user_id) user_ref = self._get_user(session, user_id)
except exception.UserNotFound: except exception.UserNotFound:
raise AssertionError(_('Invalid user / password')) raise AssertionError(_('Invalid user / password'))
if not self._check_password(password, user_ref): if self._is_account_locked(user_id, user_ref):
raise exception.AccountLocked(user_id=user_id)
elif not self._check_password(password, user_ref):
self._record_failed_auth(user_id)
raise AssertionError(_('Invalid user / password')) raise AssertionError(_('Invalid user / password'))
elif not user_ref.enabled: elif not user_ref.enabled:
raise exception.UserDisabled(user_id=user_id) raise exception.UserDisabled(user_id=user_id)
# successful auth, reset failed count if present
if user_ref.local_user.failed_auth_count:
self._reset_failed_auth(user_id)
return base.filter_user(user_ref.to_dict()) return base.filter_user(user_ref.to_dict())
def _is_account_locked(self, user_id, user_ref):
"""Check if the user account is locked.
Checks if the user account is locked based on the number of failed
authentication attempts.
:param user_id: The user ID
:param user_ref: Reference to the user object
:returns Boolean: True if the account is locked; False otherwise
"""
attempts = user_ref.local_user.failed_auth_count or 0
max_attempts = CONF.security_compliance.lockout_failure_attempts
lockout_duration = CONF.security_compliance.lockout_duration
if max_attempts and (attempts >= max_attempts):
if not lockout_duration:
return True
else:
delta = datetime.timedelta(seconds=lockout_duration)
last_failure = user_ref.local_user.failed_auth_at
if (last_failure + delta) > datetime.datetime.utcnow():
return True
else:
self._reset_failed_auth(user_id)
return False
def _record_failed_auth(self, user_id):
with sql.session_for_write() as session:
user_ref = session.query(model.User).get(user_id)
if not user_ref.local_user.failed_auth_count:
user_ref.local_user.failed_auth_count = 0
user_ref.local_user.failed_auth_count += 1
user_ref.local_user.failed_auth_at = datetime.datetime.utcnow()
def _reset_failed_auth(self, user_id):
with sql.session_for_write() as session:
user_ref = session.query(model.User).get(user_id)
user_ref.local_user.failed_auth_count = 0
user_ref.local_user.failed_auth_at = None
# user crud # user crud
@sql.handle_conflicts(conflict_type='user') @sql.handle_conflicts(conflict_type='user')

View File

@ -150,6 +150,9 @@ class User(sql.ModelBase, sql.DictBase):
if (value and if (value and
CONF.security_compliance.disable_user_account_days_inactive): CONF.security_compliance.disable_user_account_days_inactive):
self.last_active_at = datetime.datetime.utcnow().date() self.last_active_at = datetime.datetime.utcnow().date()
if value and self.local_user:
self.local_user.failed_auth_count = 0
self.local_user.failed_auth_at = None
self._enabled = value self._enabled = value
@enabled.expression @enabled.expression
@ -177,6 +180,8 @@ class LocalUser(sql.ModelBase, sql.DictBase):
lazy='subquery', lazy='subquery',
backref='local_user', backref='local_user',
order_by='Password.created_at') order_by='Password.created_at')
failed_auth_count = sql.Column(sql.Integer, nullable=True)
failed_auth_at = sql.Column(sql.DateTime, nullable=True)
__table_args__ = (sql.UniqueConstraint('domain_id', 'name'), {}) __table_args__ = (sql.UniqueConstraint('domain_id', 'name'), {})

View File

@ -13,17 +13,18 @@
import datetime import datetime
import uuid import uuid
from oslo_config import cfg import freezegun
from keystone.common import sql from keystone.common import sql
from keystone.common import utils from keystone.common import utils
import keystone.conf
from keystone import exception from keystone import exception
from keystone.identity.backends import base from keystone.identity.backends import base
from keystone.identity.backends import sql_model as model from keystone.identity.backends import sql_model as model
from keystone.tests.unit import test_backend_sql from keystone.tests.unit import test_backend_sql
CONF = cfg.CONF CONF = keystone.conf.CONF
class DisableInactiveUserTests(test_backend_sql.SqlTests): class DisableInactiveUserTests(test_backend_sql.SqlTests):
@ -276,3 +277,114 @@ class PasswordHistoryValidationTests(test_backend_sql.SqlTests):
def _get_user_ref(self, user_id): def _get_user_ref(self, user_id):
with sql.session_for_read() as session: with sql.session_for_read() as session:
return self.identity_api._get_user(session, user_id) return self.identity_api._get_user(session, user_id)
class LockingOutUserTests(test_backend_sql.SqlTests):
def setUp(self):
super(LockingOutUserTests, self).setUp()
self.config_fixture.config(
group='security_compliance',
lockout_failure_attempts=6)
self.config_fixture.config(
group='security_compliance',
lockout_duration=5)
# create user
self.password = uuid.uuid4().hex
user_dict = {
'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'enabled': True,
'password': self.password
}
self.user = self.identity_api.create_user(user_dict)
def test_locking_out_user_after_max_failed_attempts(self):
# authenticate with wrong password
self.assertRaises(AssertionError,
self.identity_api.authenticate,
self.make_request(),
user_id=self.user['id'],
password=uuid.uuid4().hex)
# authenticate with correct password
self.identity_api.authenticate(self.make_request(),
user_id=self.user['id'],
password=self.password)
# test locking out user after max failed attempts
self._fail_auth_repeatedly(self.user['id'])
self.assertRaises(exception.AccountLocked,
self.identity_api.authenticate,
self.make_request(),
user_id=self.user['id'],
password=uuid.uuid4().hex)
def test_set_enabled_unlocks_user(self):
# lockout user
self._fail_auth_repeatedly(self.user['id'])
self.assertRaises(exception.AccountLocked,
self.identity_api.authenticate,
self.make_request(),
user_id=self.user['id'],
password=uuid.uuid4().hex)
# set enabled, user should be unlocked
self.user['enabled'] = True
self.identity_api.update_user(self.user['id'], self.user)
user_ret = self.identity_api.authenticate(self.make_request(),
user_id=self.user['id'],
password=self.password)
self.assertTrue(user_ret['enabled'])
def test_lockout_duration(self):
# freeze time
with freezegun.freeze_time(datetime.datetime.utcnow()) as frozen_time:
# lockout user
self._fail_auth_repeatedly(self.user['id'])
self.assertRaises(exception.AccountLocked,
self.identity_api.authenticate,
self.make_request(),
user_id=self.user['id'],
password=uuid.uuid4().hex)
# freeze time past the duration, user should be unlocked and failed
# auth count should get reset
frozen_time.tick(delta=datetime.timedelta(
seconds=CONF.security_compliance.lockout_duration + 1))
self.identity_api.authenticate(self.make_request(),
user_id=self.user['id'],
password=self.password)
# test failed auth count was reset by authenticating with the wrong
# password, should raise an assertion error and not account locked
self.assertRaises(AssertionError,
self.identity_api.authenticate,
self.make_request(),
user_id=self.user['id'],
password=uuid.uuid4().hex)
def test_lockout_duration_failed_auth_cnt_resets(self):
# freeze time
with freezegun.freeze_time(datetime.datetime.utcnow()) as frozen_time:
# lockout user
self._fail_auth_repeatedly(self.user['id'])
self.assertRaises(exception.AccountLocked,
self.identity_api.authenticate,
self.make_request(),
user_id=self.user['id'],
password=uuid.uuid4().hex)
# freeze time past the duration, failed_auth_cnt should reset
frozen_time.tick(delta=datetime.timedelta(
seconds=CONF.security_compliance.lockout_duration + 1))
# repeat failed auth the max times
self._fail_auth_repeatedly(self.user['id'])
# test user account is locked
self.assertRaises(exception.AccountLocked,
self.identity_api.authenticate,
self.make_request(),
user_id=self.user['id'],
password=uuid.uuid4().hex)
def _fail_auth_repeatedly(self, user_id):
wrong_password = uuid.uuid4().hex
for _ in range(CONF.security_compliance.lockout_failure_attempts):
self.assertRaises(AssertionError,
self.identity_api.authenticate,
self.make_request(),
user_id=user_id,
password=wrong_password)

View File

@ -141,7 +141,9 @@ class SqlModels(SqlTests):
cols = (('id', sql.Integer, None), cols = (('id', sql.Integer, None),
('user_id', sql.String, 64), ('user_id', sql.String, 64),
('name', sql.String, 255), ('name', sql.String, 255),
('domain_id', sql.String, 64)) ('domain_id', sql.String, 64),
('failed_auth_count', sql.Integer, None),
('failed_auth_at', sql.DateTime, None))
self.assertExpectedSchema('local_user', cols) self.assertExpectedSchema('local_user', cols)
def test_password_model(self): def test_password_model(self):

View File

@ -1462,6 +1462,23 @@ class SqlUpgradeTests(SqlMigrateBase):
'created_at', 'created_at',
'last_active_at']) 'last_active_at'])
def test_migration_108_add_failed_auth_columns(self):
self.upgrade(107)
table_name = 'local_user'
self.assertTableColumns(table_name,
['id',
'user_id',
'domain_id',
'name'])
self.upgrade(108)
self.assertTableColumns(table_name,
['id',
'user_id',
'domain_id',
'name',
'failed_auth_count',
'failed_auth_at'])
class MySQLOpportunisticUpgradeTestCase(SqlUpgradeTests): class MySQLOpportunisticUpgradeTestCase(SqlUpgradeTests):
FIXTURE = test_base.MySQLOpportunisticFixture FIXTURE = test_base.MySQLOpportunisticFixture