PCI-DSS Password SQL model changes

This patch creates new password table columns and updates the password
properties to allow for multiple passwords in order to support:

* PCI-DSS 8.2.4: Change user passwords/passphrases at least once every
90 days.

* PCI-DSS 8.2.5: Do not allow an individual to submit a new
password/phrase that is the same as any of the last four
passwords/phrases he or she has used.

Partially-implements: blueprint pci-dss

Change-Id: Ib515b0d8e56a74bb8eb12baa79c8bc133fa5d809
This commit is contained in:
Ronald De Rose 2016-05-09 21:06:36 +00:00 committed by Ron De Rose
parent 8f8f85fbfc
commit 498ea91284
4 changed files with 126 additions and 19 deletions

View File

@ -0,0 +1,30 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import sqlalchemy as sql
def upgrade(migrate_engine):
meta = sql.MetaData()
meta.bind = migrate_engine
created_at = sql.Column('created_at', sql.DateTime(), nullable=True)
expires_at = sql.Column('expires_at', sql.DateTime(), nullable=True)
password_table = sql.Table('password', meta, autoload=True)
password_table.create_column(created_at)
password_table.create_column(expires_at)
now = datetime.datetime.utcnow()
stmt = password_table.update().values(created_at=now)
stmt.execute()

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import sqlalchemy
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy import orm
@ -63,25 +65,39 @@ class User(sql.ModelBase, sql.DictBase):
def name(cls):
return LocalUser.name
# password property
# password properties
@hybrid_property
def password_ref(self):
"""Return the current password."""
if self.local_user and self.local_user.passwords:
return self.local_user.passwords[-1]
return None
@hybrid_property
def password(self):
if self.local_user and self.local_user.passwords:
return self.local_user.passwords[0].password
else:
return None
if self.password_ref:
return self.password_ref.password
return None
@hybrid_property
def password_expires_at(self):
if self.password_ref:
return self.password_ref.expires_at
return None
@password.setter
def password(self, value):
if not value:
if self.local_user and self.local_user.passwords:
self.local_user.passwords = []
else:
if not self.local_user:
self.local_user = LocalUser()
if not self.local_user.passwords:
self.local_user.passwords.append(Password())
self.local_user.passwords[0].password = value
now = datetime.datetime.utcnow()
if not self.local_user:
self.local_user = LocalUser()
# set all previous passwords to be expired
for ref in self.local_user.passwords:
if not ref.expires_at or ref.expires_at > now:
ref.expires_at = now
new_password_ref = Password()
new_password_ref.password = value
new_password_ref.created_at = now
self.local_user.passwords.append(new_password_ref)
@password.expression
def password(cls):
@ -126,17 +142,23 @@ class LocalUser(sql.ModelBase, sql.DictBase):
single_parent=True,
cascade='all,delete-orphan',
lazy='subquery',
backref='local_user')
backref='local_user',
order_by='Password.created_at')
__table_args__ = (sql.UniqueConstraint('domain_id', 'name'), {})
class Password(sql.ModelBase, sql.DictBase):
__tablename__ = 'password'
attributes = ['id', 'local_user_id', 'password']
attributes = ['id', 'local_user_id', 'password', 'created_at',
'expires_at']
id = sql.Column(sql.Integer, primary_key=True)
local_user_id = sql.Column(sql.Integer, sql.ForeignKey('local_user.id',
ondelete='CASCADE'))
password = sql.Column(sql.String(128))
# created_at default set here to safe guard in case it gets missed
created_at = sql.Column(sql.DateTime, nullable=False,
default=datetime.datetime.utcnow)
expires_at = sql.Column(sql.DateTime, nullable=True)
class FederatedUser(sql.ModelBase, sql.ModelDictMixin):

View File

@ -145,7 +145,9 @@ class SqlModels(SqlTests):
def test_password_model(self):
cols = (('id', sql.Integer, None),
('local_user_id', sql.Integer, None),
('password', sql.String, 128))
('password', sql.String, 128),
('created_at', sql.DateTime, None),
('expires_at', sql.DateTime, None))
self.assertExpectedSchema('password', cols)
def test_federated_user_model(self):
@ -232,7 +234,7 @@ class SqlIdentity(SqlTests,
with sql.session_for_read() as session:
new_user_ref = self.identity_api._get_user(session,
new_user_dict['id'])
self.assertFalse(new_user_ref.local_user.passwords)
self.assertFalse(new_user_ref.password)
def test_update_user_with_null_password(self):
user_dict = unit.new_user_ref(
@ -245,7 +247,7 @@ class SqlIdentity(SqlTests,
with sql.session_for_read() as session:
new_user_ref = self.identity_api._get_user(session,
new_user_dict['id'])
self.assertFalse(new_user_ref.local_user.passwords)
self.assertFalse(new_user_ref.password)
def test_delete_user_with_project_association(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)

View File

@ -1381,6 +1381,59 @@ class SqlUpgradeTests(SqlMigrateBase):
self.assertFalse(self.does_constraint_exist('user',
constraint_name))
def test_migration_105_add_password_date_columns(self):
def add_user_model_record(session):
# add a user
user = {'id': uuid.uuid4().hex}
self.insert_dict(session, 'user', user)
# add a local user
local_user = {
'id': 1,
'user_id': user['id'],
'domain_id': 'default',
'name': uuid.uuid4().hex
}
self.insert_dict(session, 'local_user', local_user)
# add a password
password = {
'local_user_id': local_user['id'],
'password': uuid.uuid4().hex
}
self.insert_dict(session, 'password', password)
self.upgrade(104)
session = self.sessionmaker()
password_name = 'password'
# columns before
self.assertTableColumns(password_name,
['id',
'local_user_id',
'password'])
# add record and verify table count is greater than zero
add_user_model_record(session)
password_table = sqlalchemy.Table(password_name, self.metadata,
autoload=True)
cnt = session.query(password_table).count()
self.assertGreater(cnt, 0)
self.metadata.clear()
self.upgrade(105)
# columns after
self.assertTableColumns(password_name,
['id',
'local_user_id',
'password',
'created_at',
'expires_at'])
password_table = sqlalchemy.Table(password_name, self.metadata,
autoload=True)
# verify created_at is not null
null_created_at_cnt = (
session.query(password_table).filter_by(created_at=None).count())
self.assertEqual(null_created_at_cnt, 0)
# verify expires_at is null
null_expires_at_cnt = (
session.query(password_table).filter_by(expires_at=None).count())
self.assertGreater(null_expires_at_cnt, 0)
class MySQLOpportunisticUpgradeTestCase(SqlUpgradeTests):
FIXTURE = test_base.MySQLOpportunisticFixture