From b56d8f173fdc844b2289b88ae9f5e8fdade1ce07 Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Wed, 10 Jul 2013 01:04:18 -0700 Subject: [PATCH] Add password type. --- setup.py | 3 +- sqlalchemy_utils/__init__.py | 2 + sqlalchemy_utils/types/__init__.py | 2 + sqlalchemy_utils/types/password.py | 108 +++++++++++++++++++++++++++++ tests/test_password.py | 70 +++++++++++++++++++ 5 files changed, 184 insertions(+), 1 deletion(-) create mode 100644 sqlalchemy_utils/types/password.py create mode 100644 tests/test_password.py diff --git a/setup.py b/setup.py index 316bf8a..d1588cc 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,8 @@ setup( 'Jinja2>=2.3', 'docutils>=0.10', 'flexmock>=0.9.7', - ] + ], + 'password': ['passlib >= 1.6, < 2.0'] }, cmdclass={'test': PyTest}, classifiers=[ diff --git a/sqlalchemy_utils/__init__.py b/sqlalchemy_utils/__init__.py index cf659af..5110d16 100644 --- a/sqlalchemy_utils/__init__.py +++ b/sqlalchemy_utils/__init__.py @@ -11,6 +11,7 @@ from .types import ( instrumented_list, InstrumentedList, IPAddressType, + PasswordType, PhoneNumber, PhoneNumberType, NumberRange, @@ -46,6 +47,7 @@ __all__ = ( NumberRangeException, NumberRangeRawType, NumberRangeType, + PasswordType, PhoneNumber, PhoneNumberType, ProxyDict, diff --git a/sqlalchemy_utils/types/__init__.py b/sqlalchemy_utils/types/__init__.py index 5586b7c..49f6325 100644 --- a/sqlalchemy_utils/types/__init__.py +++ b/sqlalchemy_utils/types/__init__.py @@ -10,6 +10,7 @@ from .number_range import ( NumberRangeRawType, NumberRangeType, ) +from .password import PasswordType from .phone_number import PhoneNumber, PhoneNumberType from .scalar_list import ScalarListException, ScalarListType @@ -22,6 +23,7 @@ __all__ = ( NumberRangeException, NumberRangeRawType, NumberRangeType, + PasswordType, PhoneNumber, PhoneNumberType, ScalarListException, diff --git a/sqlalchemy_utils/types/password.py b/sqlalchemy_utils/types/password.py new file mode 100644 index 0000000..f520a03 --- /dev/null +++ b/sqlalchemy_utils/types/password.py @@ -0,0 +1,108 @@ +import six +import weakref +from sqlalchemy_utils import ImproperlyConfigured +from sqlalchemy import types + +try: + import passlib + from passlib.context import CryptContext + +except ImportError: + passlib = None + + +class Password(object): + + def __init__(self, value, context=None): + # Store the hash. + self.raw = value + + # Save weakref of the password context (if we have one) + if context is not None: + self.context = weakref.proxy(context) + + def __eq__(self, value): + valid, new = self.context.verify_and_update(value, self.raw) + if valid and new: + # New hash was calculated due to various reasons; stored one + # wasn't optimal, etc. + self.raw = new + return valid + + def __ne__(self, value): + return not (self == value) + + +class PasswordType(types.TypeDecorator): + """ + Hashes passwords as they come into the database and allows verifying + them using a pythonic interface :: + + >>> target = Model() + >>> target.password = 'b' + '$5$rounds=80000$H.............' + + >>> target.password == 'b' + True + + """ + + impl = types.BINARY(60) + + python_type = Password + + def __init__(self, **kwargs): + """ + All keyword arguments are forwarded to the construction of a + `passlib.context.CryptContext` object. + + The following usage will create a password column that will + automatically hash new passwords as `pbkdf2_sha512` but still compare + passwords against pre-existing `md5_crypt` hashes. As passwords are + compared; the password hash in the database will be updated to + be `pbkdf2_sha512`. :: + + class Model(Base): + password = sa.Column(PasswordType( + schemes=[ + 'pbkdf2_sha512', + 'md5_crypt' + ], + + deprecated=['md5_crypt'] + )) + + """ + + # Bail if passlib is not found. + if passlib is None: + raise ImproperlyConfigured( + "'passlib' is required to use 'PasswordType'") + + # Construct the passlib crypt context. + self.context = CryptContext(**kwargs) + + def process_bind_param(self, value, dialect): + if isinstance(value, Password): + # Value has already been hashed. + return value.raw + + if isinstance(value, six.string_types): + # Assume value has not been hashed. + return self.context.encrypt(value).encode('utf8') + + def process_result_value(self, value, dialect): + if value is not None: + return Password(value, self.context) + + def coercion_listener(self, target, value, oldvalue, initiator): + if not isinstance(value, Password): + # Hash the password using the default scheme. + value = self.context.encrypt(value).encode('utf8') + return Password(value, context=self.context) + + else: + # If were given a password object; ensure the context is right. + value.context = weakref.proxy(self.context) + + return value diff --git a/tests/test_password.py b/tests/test_password.py new file mode 100644 index 0000000..7dee13f --- /dev/null +++ b/tests/test_password.py @@ -0,0 +1,70 @@ +from pytest import mark +import sqlalchemy as sa +from tests import TestCase +from sqlalchemy_utils.types import password +from sqlalchemy_utils import coercion_listener + + +@mark.xfail('password.passlib is None') +class TestPasswordType(TestCase): + + def create_models(self): + class User(self.Base): + __tablename__ = 'user' + id = sa.Column(sa.Integer, primary_key=True) + password = sa.Column(password.PasswordType( + schemes=[ + 'pbkdf2_sha512', + 'md5_crypt' + ], + + deprecated=['md5_crypt'] + )) + + def __repr__(self): + return 'User(%r)' % self.id + + self.User = User + sa.event.listen(sa.orm.mapper, 'mapper_configured', coercion_listener) + + def test_encrypt(self): + """Should encrypt the password on setting the attribute.""" + obj = self.User() + obj.password = b'b' + + assert obj.password.raw != 'b' + assert obj.password.raw.startswith(b'$pbkdf2-sha512$') + + def test_check(self): + """ + Should be able to compare the plaintext against the + encrypted form. + """ + obj = self.User() + obj.password = 'b' + + assert obj.password == 'b' + assert obj.password != 'a' + + self.session.add(obj) + self.session.commit() + + obj = self.session.query(self.User).get(obj.id) + + assert obj.password == b'b' + assert obj.password != 'a' + + def test_check_and_update(self): + """ + Should be able to compare the plaintext against a deprecated + encrypted form and have it auto-update to the preferred version. + """ + + from passlib.hash import md5_crypt + + obj = self.User() + obj.password = password.Password(md5_crypt.encrypt('b')) + + assert obj.password.raw.startswith('$1$') + assert obj.password == 'b' + assert obj.password.raw.startswith('$pbkdf2-sha512$')