From 268adb96aaa06ce9174c1a45967629f3052e902c Mon Sep 17 00:00:00 2001 From: Konstantinos Kostis Date: Wed, 3 Sep 2014 15:10:47 +0200 Subject: [PATCH] types: addition of EncryptedType * Adds a type that has the name `EncryptedType`. This type gives the possibility to encrypt a value, so that in the database exists the encrypted form of the value and not the original one. When a value needs to be read(and is of type `EncryptedType`) the value automatically decrypted in order to present to the user. * Two encryption/decryption engines are currently implemented. One is using the AES algorithm and the other is using the Fernet algorithm. These are implemented in a library that is called Cryptography. * Inside the tests folder there is a test whose name is `test_encrypted` and tests the functionality of the above type. * Inside the `encrypted.py` file there is documentation with a complete example. Also the `data_types.rst` is modified in order to add a section for the `EncryptedType`. Signed-off-by: Konstantinos Kostis --- docs/data_types.rst | 6 + setup.py | 3 +- sqlalchemy_utils/__init__.py | 2 + sqlalchemy_utils/types/__init__.py | 2 + sqlalchemy_utils/types/encrypted.py | 233 ++++++++++++++++++++++++++++ tests/types/test_encrypted.py | 150 ++++++++++++++++++ 6 files changed, 395 insertions(+), 1 deletion(-) create mode 100644 sqlalchemy_utils/types/encrypted.py create mode 100644 tests/types/test_encrypted.py diff --git a/docs/data_types.rst b/docs/data_types.rst index a8bd143..1281a65 100644 --- a/docs/data_types.rst +++ b/docs/data_types.rst @@ -38,6 +38,12 @@ CountryType .. autoclass:: CountryType +EncryptedType +^^^^^^^^^^^^^ + +.. module:: sqlalchemy_utils.types.encrypted + +.. autoclass:: EncryptedType JSONType ^^^^^^^^ diff --git a/setup.py b/setup.py index 57b431e..cef5d26 100644 --- a/setup.py +++ b/setup.py @@ -46,7 +46,8 @@ extras_require = { 'color': ['colour>=0.0.4'], 'ipaddress': ['ipaddr'] if not PY3 else [], 'timezone': ['python-dateutil'], - 'url': ['furl >= 0.3.5'] if not PY3 else [] + 'url': ['furl >= 0.3.5'] if not PY3 else [], + 'encrypted': ['cryptography==0.6'] } diff --git a/sqlalchemy_utils/__init__.py b/sqlalchemy_utils/__init__.py index cd2f2ed..85bef09 100644 --- a/sqlalchemy_utils/__init__.py +++ b/sqlalchemy_utils/__init__.py @@ -54,6 +54,7 @@ from .types import ( DateRangeType, DateTimeRangeType, EmailType, + EncryptedType, instrumented_list, InstrumentedList, IntRangeType, @@ -130,6 +131,7 @@ __all__ = ( DateRangeType, DateTimeRangeType, EmailType, + EncryptedType, ExpressionParser, ImproperlyConfigured, InstrumentedList, diff --git a/sqlalchemy_utils/types/__init__.py b/sqlalchemy_utils/types/__init__.py index 471121f..9e37990 100644 --- a/sqlalchemy_utils/types/__init__.py +++ b/sqlalchemy_utils/types/__init__.py @@ -5,6 +5,7 @@ from .choice import ChoiceType, Choice from .color import ColorType from .country import CountryType, Country from .email import EmailType +from .encrypted import EncryptedType from .ip_address import IPAddressType from .json import JSONType from .locale import LocaleType @@ -34,6 +35,7 @@ __all__ = ( DateRangeType, DateTimeRangeType, EmailType, + EncryptedType, IntRangeType, IPAddressType, JSONType, diff --git a/sqlalchemy_utils/types/encrypted.py b/sqlalchemy_utils/types/encrypted.py new file mode 100644 index 0000000..ae5dd3b --- /dev/null +++ b/sqlalchemy_utils/types/encrypted.py @@ -0,0 +1,233 @@ +# -*- coding: utf-8 -*- +import base64 +import six +from sqlalchemy.types import TypeDecorator, String +from sqlalchemy_utils.exceptions import ImproperlyConfigured + +cryptography = None +try: + import cryptography + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.ciphers import( + Cipher, algorithms, modes + ) + from cryptography.fernet import Fernet +except ImportError: + pass + + +class EncryptionDecryptionBaseEngine(object): + """A base encryption and decryption engine. + + This class must be sub-classed in order to create + new engines. + """ + + def __init__(self, key): + """Initialize a base engine.""" + if isinstance(key, six.string_types): + key = six.b(key) + self._digest = hashes.Hash(hashes.SHA256(), backend=default_backend()) + self._digest.update(key) + self._engine_key = self._digest.finalize() + + def encrypt(self, value): + raise NotImplementedError('Subclasses must implement this!') + + def decrypt(self, value): + raise NotImplementedError('Subclasses must implement this!') + + +class AesEngine(EncryptionDecryptionBaseEngine): + """Provide AES encryption and decryption methods.""" + + BLOCK_SIZE = 16 + PADDING = six.b('*') + + def __init__(self, key): + super(AesEngine, self).__init__(key) + self._initialize_engine(self._engine_key) + + def _update_key(self, new_key): + parent = EncryptionDecryptionBaseEngine(new_key) + self._initialize_engine(parent._engine_key) + + def _initialize_engine(self, parent_class_key): + self.secret_key = parent_class_key + self.iv = self.secret_key[:16] + self.cipher = Cipher( + algorithms.AES(self.secret_key), + modes.CBC(self.iv), + backend=default_backend() + ) + + def _pad(self, value): + """Pad the message to be encrypted, if needed.""" + BS = self.BLOCK_SIZE + P = self.PADDING + padded = (value + (BS - len(value) % BS) * P) + return padded + + def encrypt(self, value): + if not isinstance(value, six.string_types): + value = repr(value) + if isinstance(value, six.text_type): + value = str(value) + value = six.b(value) + value = self._pad(value) + encryptor = self.cipher.encryptor() + encrypted = encryptor.update(value) + encryptor.finalize() + encrypted = base64.b64encode(encrypted) + return encrypted + + def decrypt(self, value): + if isinstance(value, six.text_type): + value = str(value) + decryptor = self.cipher.decryptor() + decrypted = base64.b64decode(value) + decrypted = decryptor.update(decrypted)+decryptor.finalize() + decrypted = decrypted.rstrip(self.PADDING) + if not isinstance(decrypted, six.string_types): + decrypted = decrypted.decode('utf-8') + return decrypted + + +class FernetEngine(EncryptionDecryptionBaseEngine): + """Provide Fernet encryption and decryption methods.""" + + def __init__(self, key): + super(FernetEngine, self).__init__(key) + self._initialize_engine(self._engine_key) + + def _update_key(self, new_key): + parent = EncryptionDecryptionBaseEngine(new_key) + self._initialize_engine(parent._engine_key) + + def _initialize_engine(self, parent_class_key): + self.secret_key = base64.urlsafe_b64encode(parent_class_key) + self.fernet = Fernet(self.secret_key) + + def encrypt(self, value): + if not isinstance(value, six.string_types): + value = repr(value) + if isinstance(value, six.text_type): + value = str(value) + value = six.b(value) + encrypted = self.fernet.encrypt(value) + return encrypted + + def decrypt(self, value): + if isinstance(value, six.text_type): + value = str(value) + decrypted = self.fernet.decrypt(value) + if not isinstance(decrypted, six.string_types): + decrypted = decrypted.decode('utf-8') + return decrypted + + +class EncryptedType(TypeDecorator): + """ + EncryptedType provides a way to encrypt and decrypt values, + to and from databases, that their type is a basic SQLAlchemy type. + For example Unicode, String or even Boolean. + On the way in, the value is encrypted and on the way out the stored value + is decrypted. + + EncryptedType needs Cryptography_ library in order to work. + A simple example is given below. + + .. _Cryptography: https://cryptography.io/en/latest/ + + :: + + import sqlalchemy as sa + from sqlalchemy.ext.declarative import declarative_base + from sqlalchemy import create_engine + from sqlalchemy.orm import sessionmaker + from sqlalchemy_utils import EncryptedType + + + secret_key = 'secretkey1234' + # setup + engine = create_engine('sqlite:///:memory:') + connection = engine.connect() + Base = declarative_base() + + class User(Base): + __tablename__ = "user" + id = sa.Column(sa.Integer, primary_key=True) + username = sa.Column(EncryptedType(sa.Unicode, secret_key)) + access_token = sa.Column(EncryptedType(sa.String, secret_key)) + is_active = sa.Column(EncryptedType(sa.Boolean, secret_key)) + number_of_accounts = sa.Column(EncryptedType(sa.Integer, + secret_key)) + + sa.orm.configure_mappers() + Base.metadata.create_all(connection) + + # create a configured "Session" class + Session = sessionmaker(bind=connection) + + # create a Session + session = Session() + + # example + user_name = u'secret_user' + test_token = 'atesttoken' + active = True + num_of_accounts = 2 + + user = User(username=user_name, access_token=test_token, + is_active=active, accounts_num=accounts) + session.add(user) + session.commit() + + print('id: {}'.format(user.id)) + print('username: {}'.format(user.username)) + print('token: {}'.format(user.access_token)) + print('active: {}'.format(user.is_active)) + print('accounts: {}'.format(user.accounts_num)) + + # teardown + session.close_all() + Base.metadata.drop_all(connection) + connection.close() + engine.dispose() + """ + + impl = String + + def __init__(self, type_in=None, key=None, engine=None, **kwargs): + """Initialization.""" + if not cryptography: + raise ImproperlyConfigured( + "'cryptography' is required to use EncryptedType" + ) + super(EncryptedType, self).__init__(**kwargs) + # set the underlying type + if type_in is None: + type_in = String() + self.underlying_type = type_in() + self._key = key + if not engine: + engine = AesEngine + self.engine = engine(self._key) + + @property + def key(self): + return self._key + + @key.setter + def key(self, value): + self._key = value + self.engine._update_key(self._key) + + def process_bind_param(self, value, dialect): + """Encrypt a value on the way in.""" + return self.engine.encrypt(value) + + def process_result_value(self, value, dialect): + """Decrypt value on the way out.""" + decrypted_value = self.engine.decrypt(value) + return self.underlying_type.python_type(decrypted_value) diff --git a/tests/types/test_encrypted.py b/tests/types/test_encrypted.py new file mode 100644 index 0000000..e77a18b --- /dev/null +++ b/tests/types/test_encrypted.py @@ -0,0 +1,150 @@ +import sqlalchemy as sa +from tests import TestCase +from sqlalchemy_utils import EncryptedType +from sqlalchemy_utils.types.encrypted import AesEngine +from sqlalchemy_utils.types.encrypted import FernetEngine + +class EncryptedTypeTestCase(TestCase): + + def setup_method(self, method): + # set some test values + self.test_key = 'secretkey1234' + self.user_name = u'someone' + self.test_token = self.generate_test_token() + self.active = True + self.accounts_num = 2 + self.searched_user = None + super(EncryptedTypeTestCase, self).setup_method(method) + # set the values to the user object + self.user = self.User() + self.user.username = self.user_name + self.user.access_token = self.test_token + self.user.is_active = self.active + self.user.accounts_num = self.accounts_num + self.session.add(self.user) + self.session.commit() + + def teardown_method(self, method): + self.session.delete(self.user) + self.session.commit() + del self.user_name + del self.test_token + del self.active + del self.accounts_num + del self.test_key + del self.searched_user + super(EncryptedTypeTestCase, self).teardown_method(method) + + def create_models(self): + class User(self.Base): + __tablename__ = 'user' + id = sa.Column(sa.Integer, primary_key=True) + username = sa.Column(EncryptedType( + sa.Unicode, + self.test_key, + self.__class__.encryption_engine)) + access_token = sa.Column(EncryptedType( + sa.String, + self.test_key, + self.__class__.encryption_engine)) + is_active = sa.Column(EncryptedType( + sa.Boolean, + self.test_key, + self.__class__.encryption_engine)) + accounts_num = sa.Column(EncryptedType( + sa.Integer, + self.test_key, + self.__class__.encryption_engine)) + + def __repr__(self): + return ( + "User(id={}, username={}, access_token={}," + "active={}, accounts={})".format( + self.id, + self.username, + self.access_token, + self.is_active, + self.accounts_num + ) + ) + + self.User = User + + def assert_username(self, _user): + assert _user.username == self.user_name + + def assert_access_token(self, _user): + assert _user.access_token == self.test_token + + def assert_is_active(self, _user): + assert _user.is_active == self.active + + def assert_accounts_num(self, _user): + assert _user.accounts_num == self.accounts_num + + def generate_test_token(self): + import string + import random + token = "" + characters = string.ascii_letters + string.digits + for i in range(60): + token += ''.join(random.choice(characters)) + return token + + +class TestAesEncryptedTypeTestcase(EncryptedTypeTestCase): + + encryption_engine = AesEngine + + def test_unicode(self): + self.searched_user = self.session.query(self.User).filter( + self.User.access_token == self.test_token + ).first() + self.assert_username(self.searched_user) + + def test_string(self): + self.searched_user = self.session.query(self.User).filter( + self.User.username == self.user_name + ).first() + self.assert_access_token(self.searched_user) + + def test_boolean(self): + self.searched_user = self.session.query(self.User).filter( + self.User.access_token == self.test_token + ).first() + self.assert_is_active(self.searched_user) + + def test_integer(self): + self.searched_user = self.session.query(self.User).filter( + self.User.access_token == self.test_token + ).first() + self.assert_accounts_num(self.searched_user) + + +class TestFernetEnryptedTypeTestCase(EncryptedTypeTestCase): + + encryption_engine = FernetEngine + + def test_unicode(self): + self.searched_user = self.session.query(self.User).filter( + self.User.id == self.user.id + ).first() + self.assert_username(self.searched_user) + + def test_string(self): + self.searched_user = self.session.query(self.User).filter( + self.User.id == self.user.id + ).first() + self.assert_access_token(self.searched_user) + + def test_boolean(self): + self.searched_user = self.session.query(self.User).filter( + self.User.id == self.user.id + ).first() + self.assert_is_active(self.searched_user) + + def test_integer(self): + self.searched_user = self.session.query(self.User).filter( + self.User.id == self.user.id + ).first() + self.assert_accounts_num(self.searched_user)