PCI-DSS Minimum password age requirements
The minimum password age is the period at which a password must be used before it can be changed. This prevents users from immediately wiping out their password history in order to use an old password. Partially-implements: blueprint pci-dss Change-Id: Ib1367bc69b791ef35de8f18704437e8fc233afdf
This commit is contained in:
parent
d082fb29ce
commit
b4ff783989
@ -0,0 +1,24 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import sqlalchemy as sql
|
||||
|
||||
|
||||
def upgrade(migrate_engine):
|
||||
meta = sql.MetaData()
|
||||
meta.bind = migrate_engine
|
||||
|
||||
self_service_column = sql.Column('self_service', sql.Boolean,
|
||||
nullable=False, server_default='0',
|
||||
default=False)
|
||||
password_table = sql.Table('password', meta, autoload=True)
|
||||
password_table.create_column(self_service_column)
|
@ -79,14 +79,19 @@ feature, values must be greater than 1. This feature depends on the `sql`
|
||||
backend for the `[identity] driver`.
|
||||
"""))
|
||||
|
||||
password_change_limit_per_day = cfg.IntOpt(
|
||||
'password_change_limit_per_day',
|
||||
minimum_password_age = cfg.IntOpt(
|
||||
'minimum_password_age',
|
||||
default=0,
|
||||
min=0,
|
||||
help=utils.fmt("""
|
||||
The maximum number of times a user can change their password in a single day.
|
||||
Setting the value to zero (the default) disables this feature. This feature
|
||||
depends on the `sql` backend for the `[identity] driver`.
|
||||
The number of days that a password must be used before the user can change it.
|
||||
This prevents users from changing their passwords immediately in order to wipe
|
||||
out their password history and reuse an old password. This feature does not
|
||||
prevent administrators from manually resetting passwords. It is disabled by
|
||||
default and allows for immediate password changes. This feature depends on the
|
||||
`sql` backend for the `[identity] driver`. Note: If `[security_compliance]
|
||||
password_expires_days` is set, then the value for this option should be less
|
||||
than the `password_expires_days`.
|
||||
"""))
|
||||
|
||||
password_regex = cfg.StrOpt(
|
||||
@ -118,7 +123,7 @@ ALL_OPTS = [
|
||||
lockout_duration,
|
||||
password_expires_days,
|
||||
unique_last_password_count,
|
||||
password_change_limit_per_day,
|
||||
minimum_password_age,
|
||||
password_regex,
|
||||
password_regex_description,
|
||||
]
|
||||
|
@ -94,7 +94,16 @@ class URLValidationError(ValidationError):
|
||||
|
||||
|
||||
class PasswordValidationError(ValidationError):
|
||||
message_format = _("Invalid password: %(detail)s")
|
||||
message_format = _("Password validation error: %(detail)s")
|
||||
|
||||
|
||||
class PasswordAgeValidationError(PasswordValidationError):
|
||||
message_format = _("You cannot change your password at this time due "
|
||||
"to the minimum password age. Once you change your "
|
||||
"password, it must be used for %(min_age_days)d day(s) "
|
||||
"before it can be changed. Please try again in "
|
||||
"%(days_left)d day(s) or contact your administrator to "
|
||||
"reset your password.")
|
||||
|
||||
|
||||
class SchemaValidationError(ValidationError):
|
||||
|
@ -271,6 +271,20 @@ class IdentityDriverV8(object):
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
||||
@abc.abstractmethod
|
||||
def change_password(self, user_id, new_password):
|
||||
"""Self-service password change.
|
||||
|
||||
:param str user_id: User ID.
|
||||
:param str new_password: New password.
|
||||
|
||||
:raises keystone.exception.UserNotFound: If the user doesn't exist.
|
||||
:raises keystone.exception.PasswordValidation: If password fails
|
||||
validation
|
||||
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
||||
@abc.abstractmethod
|
||||
def add_user_to_group(self, user_id, group_id):
|
||||
"""Add a user to a group.
|
||||
|
@ -116,6 +116,11 @@ class Identity(base.IdentityDriverV8):
|
||||
self.user.update(user_id, user, old_obj)
|
||||
return self.user.get_filtered(user_id)
|
||||
|
||||
def change_password(self, user_id, new_password):
|
||||
raise exception.NotImplemented(
|
||||
_('Self-service user password changes are not implemented for '
|
||||
'LDAP.'))
|
||||
|
||||
def delete_user(self, user_id):
|
||||
msg = _DEPRECATION_MSG % "delete_user"
|
||||
versionutils.report_deprecated_feature(LOG, msg)
|
||||
|
@ -188,6 +188,23 @@ class Identity(base.IdentityDriverV8):
|
||||
'%(unique_cnt)d') % {'unique_cnt': unique_cnt}
|
||||
raise exception.PasswordValidationError(detail=detail)
|
||||
|
||||
def change_password(self, user_id, new_password):
|
||||
with sql.session_for_write() as session:
|
||||
user_ref = session.query(model.User).get(user_id)
|
||||
if user_ref.password_ref and user_ref.password_ref.self_service:
|
||||
self._validate_minimum_password_age(user_ref)
|
||||
user_ref.password = utils.hash_password(new_password)
|
||||
user_ref.password_ref.self_service = True
|
||||
|
||||
def _validate_minimum_password_age(self, user_ref):
|
||||
min_age_days = CONF.security_compliance.minimum_password_age
|
||||
min_age = (user_ref.password_created_at +
|
||||
datetime.timedelta(days=min_age_days))
|
||||
if datetime.datetime.utcnow() < min_age:
|
||||
days_left = (min_age - datetime.datetime.utcnow()).days
|
||||
raise exception.PasswordAgeValidationError(
|
||||
min_age_days=min_age_days, days_left=days_left)
|
||||
|
||||
def add_user_to_group(self, user_id, group_id):
|
||||
with sql.session_for_write() as session:
|
||||
self.get_group(group_id)
|
||||
|
@ -86,6 +86,12 @@ class User(sql.ModelBase, sql.DictBase):
|
||||
return self.password_ref.password
|
||||
return None
|
||||
|
||||
@hybrid_property
|
||||
def password_created_at(self):
|
||||
if self.password_ref:
|
||||
return self.password_ref.created_at
|
||||
return None
|
||||
|
||||
@hybrid_property
|
||||
def password_expires_at(self):
|
||||
if self.password_ref:
|
||||
@ -230,6 +236,8 @@ class Password(sql.ModelBase, sql.DictBase):
|
||||
created_at = sql.Column(sql.DateTime, nullable=False,
|
||||
default=datetime.datetime.utcnow)
|
||||
expires_at = sql.Column(sql.DateTime, nullable=True)
|
||||
self_service = sql.Column(sql.Boolean, default=False, nullable=False,
|
||||
server_default='0')
|
||||
|
||||
|
||||
class FederatedUser(sql.ModelBase, sql.ModelDictMixin):
|
||||
|
@ -1237,8 +1237,11 @@ class Manager(manager.Manager):
|
||||
|
||||
validators.validate_password(new_password)
|
||||
|
||||
update_dict = {'password': new_password}
|
||||
self.update_user(user_id, update_dict)
|
||||
domain_id, driver, entity_id = (
|
||||
self._get_domain_driver_and_entity_id(user_id))
|
||||
driver.change_password(entity_id, new_password)
|
||||
notifications.Audit.updated(self._USER, user_id)
|
||||
self.emit_invalidate_user_token_persistence(user_id)
|
||||
|
||||
@MEMOIZE
|
||||
def _shadow_nonlocal_user(self, user):
|
||||
|
@ -22,6 +22,10 @@ class IdentityDriverV8Tests(object):
|
||||
# subclasses that don't allow name updates must set this to False.
|
||||
allows_name_update = True
|
||||
|
||||
# subclasses that don't allow self-service password changes must set this
|
||||
# to False.
|
||||
allows_self_service_change_password = True
|
||||
|
||||
# Subclasses must override this to indicate whether it's domain-aware or
|
||||
# not.
|
||||
expected_is_domain_aware = True
|
||||
@ -237,6 +241,18 @@ class IdentityDriverV8Tests(object):
|
||||
self.assertRaises(exception.Conflict, self.driver.update_user,
|
||||
user['id'], user_mod)
|
||||
|
||||
def test_change_password(self):
|
||||
if not self.allows_self_service_change_password:
|
||||
self.skipTest("Backend doesn't allow change password.")
|
||||
# create user
|
||||
password = uuid.uuid4().hex
|
||||
domain_id = uuid.uuid4().hex
|
||||
user = self.create_user(domain_id=domain_id, password=password)
|
||||
# change password
|
||||
new_password = uuid.uuid4().hex
|
||||
self.driver.change_password(user['id'], new_password)
|
||||
self.driver.authenticate(user['id'], new_password)
|
||||
|
||||
def test_delete_user(self):
|
||||
user = self.create_user()
|
||||
|
||||
|
@ -22,6 +22,7 @@ class TestIdentityDriver(core.BaseTestCase,
|
||||
test_base.IdentityDriverV8Tests):
|
||||
|
||||
allows_name_update = False
|
||||
allows_self_service_change_password = False
|
||||
expected_is_domain_aware = False
|
||||
expected_default_assignment_driver = 'sql'
|
||||
expected_is_sql = False
|
||||
|
@ -77,7 +77,7 @@ class DisableInactiveUserTests(test_backend_sql.SqlTests):
|
||||
last_active_at = (
|
||||
datetime.datetime.utcnow() -
|
||||
datetime.timedelta(self.max_inactive_days + 1)).date()
|
||||
self._update_user(user['id'], last_active_at)
|
||||
self._update_user_last_active_at(user['id'], last_active_at)
|
||||
# get user and verify that the user is actually disabled
|
||||
user = self.identity_api.get_user(user['id'])
|
||||
self.assertFalse(user['enabled'])
|
||||
@ -94,7 +94,7 @@ class DisableInactiveUserTests(test_backend_sql.SqlTests):
|
||||
last_active_at = (
|
||||
datetime.datetime.utcnow() -
|
||||
datetime.timedelta(self.max_inactive_days - 1)).date()
|
||||
self._update_user(user['id'], last_active_at)
|
||||
self._update_user_last_active_at(user['id'], last_active_at)
|
||||
# get user and verify that the user is still enabled
|
||||
user = self.identity_api.get_user(user['id'])
|
||||
self.assertTrue(user['enabled'])
|
||||
@ -147,7 +147,7 @@ class DisableInactiveUserTests(test_backend_sql.SqlTests):
|
||||
session.add(user_ref)
|
||||
return base.filter_user(user_ref.to_dict())
|
||||
|
||||
def _update_user(self, user_id, last_active_at):
|
||||
def _update_user_last_active_at(self, user_id, last_active_at):
|
||||
with sql.session_for_write() as session:
|
||||
user_ref = session.query(model.User).get(user_id)
|
||||
user_ref.last_active_at = last_active_at
|
||||
@ -465,3 +465,85 @@ class PasswordExpiresValidationTests(test_backend_sql.SqlTests):
|
||||
user_ref._get_password_expires_at(password_created_at))
|
||||
session.add(user_ref)
|
||||
return base.filter_user(user_ref.to_dict())
|
||||
|
||||
|
||||
class MinimumPasswordAgeTests(test_backend_sql.SqlTests):
|
||||
def setUp(self):
|
||||
super(MinimumPasswordAgeTests, self).setUp()
|
||||
self.config_fixture.config(
|
||||
group='security_compliance',
|
||||
minimum_password_age=1)
|
||||
self.initial_password = uuid.uuid4().hex
|
||||
self.user = self._create_new_user(self.initial_password)
|
||||
|
||||
def test_user_cannot_change_password_before_min_age(self):
|
||||
# user can change password after create
|
||||
new_password = uuid.uuid4().hex
|
||||
self.assertValidChangePassword(self.user['id'], self.initial_password,
|
||||
new_password)
|
||||
# user cannot change password before min age
|
||||
self.assertRaises(exception.PasswordAgeValidationError,
|
||||
self.identity_api.change_password,
|
||||
self.make_request(),
|
||||
user_id=self.user['id'],
|
||||
original_password=new_password,
|
||||
new_password=uuid.uuid4().hex)
|
||||
|
||||
def test_user_can_change_password_after_min_age(self):
|
||||
# user can change password after create
|
||||
new_password = uuid.uuid4().hex
|
||||
self.assertValidChangePassword(self.user['id'], self.initial_password,
|
||||
new_password)
|
||||
# set password_created_at so that the min password age has past
|
||||
password_created_at = (
|
||||
datetime.datetime.utcnow() -
|
||||
datetime.timedelta(
|
||||
days=CONF.security_compliance.minimum_password_age + 1))
|
||||
self._update_password_created_at(self.user['id'], password_created_at)
|
||||
# user can change their password after min password age has past
|
||||
self.assertValidChangePassword(self.user['id'], new_password,
|
||||
uuid.uuid4().hex)
|
||||
|
||||
def test_user_can_change_password_after_admin_reset(self):
|
||||
# user can change password after create
|
||||
new_password = uuid.uuid4().hex
|
||||
self.assertValidChangePassword(self.user['id'], self.initial_password,
|
||||
new_password)
|
||||
# user cannot change password before min age
|
||||
self.assertRaises(exception.PasswordAgeValidationError,
|
||||
self.identity_api.change_password,
|
||||
self.make_request(),
|
||||
user_id=self.user['id'],
|
||||
original_password=new_password,
|
||||
new_password=uuid.uuid4().hex)
|
||||
# admin reset
|
||||
new_password = uuid.uuid4().hex
|
||||
self.user['password'] = new_password
|
||||
self.identity_api.update_user(self.user['id'], self.user)
|
||||
# user can change password after admin reset
|
||||
self.assertValidChangePassword(self.user['id'], new_password,
|
||||
uuid.uuid4().hex)
|
||||
|
||||
def assertValidChangePassword(self, user_id, password, new_password):
|
||||
self.identity_api.change_password(self.make_request(),
|
||||
user_id=user_id,
|
||||
original_password=password,
|
||||
new_password=new_password)
|
||||
self.identity_api.authenticate(self.make_request(),
|
||||
user_id=user_id,
|
||||
password=new_password)
|
||||
|
||||
def _create_new_user(self, password):
|
||||
user = {
|
||||
'name': uuid.uuid4().hex,
|
||||
'domain_id': CONF.identity.default_domain_id,
|
||||
'enabled': True,
|
||||
'password': password
|
||||
}
|
||||
return self.identity_api.create_user(user)
|
||||
|
||||
def _update_password_created_at(self, user_id, password_create_at):
|
||||
with sql.session_for_write() as session:
|
||||
user_ref = session.query(model.User).get(user_id)
|
||||
for password_ref in user_ref.local_user.passwords:
|
||||
password_ref.created_at = password_create_at
|
||||
|
@ -151,7 +151,8 @@ class SqlModels(SqlTests):
|
||||
('local_user_id', sql.Integer, None),
|
||||
('password', sql.String, 128),
|
||||
('created_at', sql.DateTime, None),
|
||||
('expires_at', sql.DateTime, None))
|
||||
('expires_at', sql.DateTime, None),
|
||||
('self_service', sql.Boolean, False))
|
||||
self.assertExpectedSchema('password', cols)
|
||||
|
||||
def test_federated_user_model(self):
|
||||
|
@ -1479,6 +1479,24 @@ class SqlUpgradeTests(SqlMigrateBase):
|
||||
'failed_auth_count',
|
||||
'failed_auth_at'])
|
||||
|
||||
def test_migration_109_add_password_self_service_column(self):
|
||||
password_table = 'password'
|
||||
self.upgrade(108)
|
||||
self.assertTableColumns(password_table,
|
||||
['id',
|
||||
'local_user_id',
|
||||
'password',
|
||||
'created_at',
|
||||
'expires_at'])
|
||||
self.upgrade(109)
|
||||
self.assertTableColumns(password_table,
|
||||
['id',
|
||||
'local_user_id',
|
||||
'password',
|
||||
'created_at',
|
||||
'expires_at',
|
||||
'self_service'])
|
||||
|
||||
|
||||
class MySQLOpportunisticUpgradeTestCase(SqlUpgradeTests):
|
||||
FIXTURE = test_base.MySQLOpportunisticFixture
|
||||
|
@ -455,6 +455,25 @@ class IdentityTestCase(test_v3.RestfulTestCase):
|
||||
password=new_password)
|
||||
self.v3_create_token(new_password_auth)
|
||||
|
||||
def test_admin_password_reset_with_min_password_age_enabled(self):
|
||||
# enable minimum_password_age, this should have no effect on admin
|
||||
# password reset
|
||||
self.config_fixture.config(group='security_compliance',
|
||||
minimum_password_age=1)
|
||||
# create user
|
||||
user_ref = unit.create_user(self.identity_api,
|
||||
domain_id=self.domain['id'])
|
||||
# administrative password reset
|
||||
new_password = uuid.uuid4().hex
|
||||
r = self.patch('/users/%s' % user_ref['id'],
|
||||
body={'user': {'password': new_password}})
|
||||
self.assertValidUserResponse(r, user_ref)
|
||||
# authenticate with new password
|
||||
new_password_auth = self.build_authentication_request(
|
||||
user_id=user_ref['id'],
|
||||
password=new_password)
|
||||
self.v3_create_token(new_password_auth)
|
||||
|
||||
def test_update_user_domain_id(self):
|
||||
"""Call ``PATCH /users/{user_id}`` with domain_id."""
|
||||
user = unit.new_user_ref(domain_id=self.domain['id'])
|
||||
@ -792,6 +811,27 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase):
|
||||
self.get_request_token(new_password,
|
||||
expected_status=http_client.CREATED)
|
||||
|
||||
def test_changing_password_with_min_password_age(self):
|
||||
# enable minimum_password_age and attempt to change password
|
||||
new_password = uuid.uuid4().hex
|
||||
self.config_fixture.config(group='security_compliance',
|
||||
minimum_password_age=1)
|
||||
# able to change password after create user
|
||||
self.change_password(password=new_password,
|
||||
original_password=self.user_ref['password'],
|
||||
expected_status=http_client.NO_CONTENT)
|
||||
# 2nd change password should fail due to minimum password age
|
||||
self.token = self.get_request_token(new_password, http_client.CREATED)
|
||||
self.change_password(password=uuid.uuid4().hex,
|
||||
original_password=new_password,
|
||||
expected_status=http_client.BAD_REQUEST)
|
||||
# disable minimum_password_age and attempt to change password
|
||||
self.config_fixture.config(group='security_compliance',
|
||||
minimum_password_age=0)
|
||||
self.change_password(password=uuid.uuid4().hex,
|
||||
original_password=new_password,
|
||||
expected_status=http_client.NO_CONTENT)
|
||||
|
||||
def test_changing_password_with_missing_original_password_fails(self):
|
||||
r = self.change_password(password=uuid.uuid4().hex,
|
||||
expected_status=http_client.BAD_REQUEST)
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
upgrade:
|
||||
- The identity backend driver interface has changed. We've added a new
|
||||
``change_password()`` method for self service password changes. If you have a
|
||||
custom implementation for the identity driver, you will need to implement
|
||||
this new method.
|
Loading…
Reference in New Issue
Block a user