types: addition of EncryptedType

* Adds a type that has the name `EncryptedType`. This type gives
  the possibility to encrypt a value, so that in the database exists
  the encrypted form of the value and not the original one. When
  a value needs to be read(and is of type `EncryptedType`) the value
  automatically decrypted in order to present to the user.

* Two encryption/decryption engines are currently implemented.
  One is using the AES algorithm and the other is using the
  Fernet algorithm. These are implemented in a library that is
  called Cryptography.

* Inside the tests folder there is a test whose name is
  `test_encrypted` and tests the functionality of the above type.

* Inside the `encrypted.py` file there is documentation with a
  complete example. Also the `data_types.rst` is modified in order
  to add a section for the `EncryptedType`.

Signed-off-by: Konstantinos Kostis <konstantinos.kostis@cern.ch>
This commit is contained in:
Konstantinos Kostis
2014-09-03 15:10:47 +02:00
parent 242844a513
commit 268adb96aa
6 changed files with 395 additions and 1 deletions

View File

@@ -38,6 +38,12 @@ CountryType
.. autoclass:: CountryType
EncryptedType
^^^^^^^^^^^^^
.. module:: sqlalchemy_utils.types.encrypted
.. autoclass:: EncryptedType
JSONType
^^^^^^^^

View File

@@ -46,7 +46,8 @@ extras_require = {
'color': ['colour>=0.0.4'],
'ipaddress': ['ipaddr'] if not PY3 else [],
'timezone': ['python-dateutil'],
'url': ['furl >= 0.3.5'] if not PY3 else []
'url': ['furl >= 0.3.5'] if not PY3 else [],
'encrypted': ['cryptography==0.6']
}

View File

@@ -54,6 +54,7 @@ from .types import (
DateRangeType,
DateTimeRangeType,
EmailType,
EncryptedType,
instrumented_list,
InstrumentedList,
IntRangeType,
@@ -130,6 +131,7 @@ __all__ = (
DateRangeType,
DateTimeRangeType,
EmailType,
EncryptedType,
ExpressionParser,
ImproperlyConfigured,
InstrumentedList,

View File

@@ -5,6 +5,7 @@ from .choice import ChoiceType, Choice
from .color import ColorType
from .country import CountryType, Country
from .email import EmailType
from .encrypted import EncryptedType
from .ip_address import IPAddressType
from .json import JSONType
from .locale import LocaleType
@@ -34,6 +35,7 @@ __all__ = (
DateRangeType,
DateTimeRangeType,
EmailType,
EncryptedType,
IntRangeType,
IPAddressType,
JSONType,

View File

@@ -0,0 +1,233 @@
# -*- coding: utf-8 -*-
import base64
import six
from sqlalchemy.types import TypeDecorator, String
from sqlalchemy_utils.exceptions import ImproperlyConfigured
cryptography = None
try:
import cryptography
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import(
Cipher, algorithms, modes
)
from cryptography.fernet import Fernet
except ImportError:
pass
class EncryptionDecryptionBaseEngine(object):
"""A base encryption and decryption engine.
This class must be sub-classed in order to create
new engines.
"""
def __init__(self, key):
"""Initialize a base engine."""
if isinstance(key, six.string_types):
key = six.b(key)
self._digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
self._digest.update(key)
self._engine_key = self._digest.finalize()
def encrypt(self, value):
raise NotImplementedError('Subclasses must implement this!')
def decrypt(self, value):
raise NotImplementedError('Subclasses must implement this!')
class AesEngine(EncryptionDecryptionBaseEngine):
"""Provide AES encryption and decryption methods."""
BLOCK_SIZE = 16
PADDING = six.b('*')
def __init__(self, key):
super(AesEngine, self).__init__(key)
self._initialize_engine(self._engine_key)
def _update_key(self, new_key):
parent = EncryptionDecryptionBaseEngine(new_key)
self._initialize_engine(parent._engine_key)
def _initialize_engine(self, parent_class_key):
self.secret_key = parent_class_key
self.iv = self.secret_key[:16]
self.cipher = Cipher(
algorithms.AES(self.secret_key),
modes.CBC(self.iv),
backend=default_backend()
)
def _pad(self, value):
"""Pad the message to be encrypted, if needed."""
BS = self.BLOCK_SIZE
P = self.PADDING
padded = (value + (BS - len(value) % BS) * P)
return padded
def encrypt(self, value):
if not isinstance(value, six.string_types):
value = repr(value)
if isinstance(value, six.text_type):
value = str(value)
value = six.b(value)
value = self._pad(value)
encryptor = self.cipher.encryptor()
encrypted = encryptor.update(value) + encryptor.finalize()
encrypted = base64.b64encode(encrypted)
return encrypted
def decrypt(self, value):
if isinstance(value, six.text_type):
value = str(value)
decryptor = self.cipher.decryptor()
decrypted = base64.b64decode(value)
decrypted = decryptor.update(decrypted)+decryptor.finalize()
decrypted = decrypted.rstrip(self.PADDING)
if not isinstance(decrypted, six.string_types):
decrypted = decrypted.decode('utf-8')
return decrypted
class FernetEngine(EncryptionDecryptionBaseEngine):
"""Provide Fernet encryption and decryption methods."""
def __init__(self, key):
super(FernetEngine, self).__init__(key)
self._initialize_engine(self._engine_key)
def _update_key(self, new_key):
parent = EncryptionDecryptionBaseEngine(new_key)
self._initialize_engine(parent._engine_key)
def _initialize_engine(self, parent_class_key):
self.secret_key = base64.urlsafe_b64encode(parent_class_key)
self.fernet = Fernet(self.secret_key)
def encrypt(self, value):
if not isinstance(value, six.string_types):
value = repr(value)
if isinstance(value, six.text_type):
value = str(value)
value = six.b(value)
encrypted = self.fernet.encrypt(value)
return encrypted
def decrypt(self, value):
if isinstance(value, six.text_type):
value = str(value)
decrypted = self.fernet.decrypt(value)
if not isinstance(decrypted, six.string_types):
decrypted = decrypted.decode('utf-8')
return decrypted
class EncryptedType(TypeDecorator):
"""
EncryptedType provides a way to encrypt and decrypt values,
to and from databases, that their type is a basic SQLAlchemy type.
For example Unicode, String or even Boolean.
On the way in, the value is encrypted and on the way out the stored value
is decrypted.
EncryptedType needs Cryptography_ library in order to work.
A simple example is given below.
.. _Cryptography: https://cryptography.io/en/latest/
::
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy_utils import EncryptedType
secret_key = 'secretkey1234'
# setup
engine = create_engine('sqlite:///:memory:')
connection = engine.connect()
Base = declarative_base()
class User(Base):
__tablename__ = "user"
id = sa.Column(sa.Integer, primary_key=True)
username = sa.Column(EncryptedType(sa.Unicode, secret_key))
access_token = sa.Column(EncryptedType(sa.String, secret_key))
is_active = sa.Column(EncryptedType(sa.Boolean, secret_key))
number_of_accounts = sa.Column(EncryptedType(sa.Integer,
secret_key))
sa.orm.configure_mappers()
Base.metadata.create_all(connection)
# create a configured "Session" class
Session = sessionmaker(bind=connection)
# create a Session
session = Session()
# example
user_name = u'secret_user'
test_token = 'atesttoken'
active = True
num_of_accounts = 2
user = User(username=user_name, access_token=test_token,
is_active=active, accounts_num=accounts)
session.add(user)
session.commit()
print('id: {}'.format(user.id))
print('username: {}'.format(user.username))
print('token: {}'.format(user.access_token))
print('active: {}'.format(user.is_active))
print('accounts: {}'.format(user.accounts_num))
# teardown
session.close_all()
Base.metadata.drop_all(connection)
connection.close()
engine.dispose()
"""
impl = String
def __init__(self, type_in=None, key=None, engine=None, **kwargs):
"""Initialization."""
if not cryptography:
raise ImproperlyConfigured(
"'cryptography' is required to use EncryptedType"
)
super(EncryptedType, self).__init__(**kwargs)
# set the underlying type
if type_in is None:
type_in = String()
self.underlying_type = type_in()
self._key = key
if not engine:
engine = AesEngine
self.engine = engine(self._key)
@property
def key(self):
return self._key
@key.setter
def key(self, value):
self._key = value
self.engine._update_key(self._key)
def process_bind_param(self, value, dialect):
"""Encrypt a value on the way in."""
return self.engine.encrypt(value)
def process_result_value(self, value, dialect):
"""Decrypt value on the way out."""
decrypted_value = self.engine.decrypt(value)
return self.underlying_type.python_type(decrypted_value)

View File

@@ -0,0 +1,150 @@
import sqlalchemy as sa
from tests import TestCase
from sqlalchemy_utils import EncryptedType
from sqlalchemy_utils.types.encrypted import AesEngine
from sqlalchemy_utils.types.encrypted import FernetEngine
class EncryptedTypeTestCase(TestCase):
def setup_method(self, method):
# set some test values
self.test_key = 'secretkey1234'
self.user_name = u'someone'
self.test_token = self.generate_test_token()
self.active = True
self.accounts_num = 2
self.searched_user = None
super(EncryptedTypeTestCase, self).setup_method(method)
# set the values to the user object
self.user = self.User()
self.user.username = self.user_name
self.user.access_token = self.test_token
self.user.is_active = self.active
self.user.accounts_num = self.accounts_num
self.session.add(self.user)
self.session.commit()
def teardown_method(self, method):
self.session.delete(self.user)
self.session.commit()
del self.user_name
del self.test_token
del self.active
del self.accounts_num
del self.test_key
del self.searched_user
super(EncryptedTypeTestCase, self).teardown_method(method)
def create_models(self):
class User(self.Base):
__tablename__ = 'user'
id = sa.Column(sa.Integer, primary_key=True)
username = sa.Column(EncryptedType(
sa.Unicode,
self.test_key,
self.__class__.encryption_engine))
access_token = sa.Column(EncryptedType(
sa.String,
self.test_key,
self.__class__.encryption_engine))
is_active = sa.Column(EncryptedType(
sa.Boolean,
self.test_key,
self.__class__.encryption_engine))
accounts_num = sa.Column(EncryptedType(
sa.Integer,
self.test_key,
self.__class__.encryption_engine))
def __repr__(self):
return (
"User(id={}, username={}, access_token={},"
"active={}, accounts={})".format(
self.id,
self.username,
self.access_token,
self.is_active,
self.accounts_num
)
)
self.User = User
def assert_username(self, _user):
assert _user.username == self.user_name
def assert_access_token(self, _user):
assert _user.access_token == self.test_token
def assert_is_active(self, _user):
assert _user.is_active == self.active
def assert_accounts_num(self, _user):
assert _user.accounts_num == self.accounts_num
def generate_test_token(self):
import string
import random
token = ""
characters = string.ascii_letters + string.digits
for i in range(60):
token += ''.join(random.choice(characters))
return token
class TestAesEncryptedTypeTestcase(EncryptedTypeTestCase):
encryption_engine = AesEngine
def test_unicode(self):
self.searched_user = self.session.query(self.User).filter(
self.User.access_token == self.test_token
).first()
self.assert_username(self.searched_user)
def test_string(self):
self.searched_user = self.session.query(self.User).filter(
self.User.username == self.user_name
).first()
self.assert_access_token(self.searched_user)
def test_boolean(self):
self.searched_user = self.session.query(self.User).filter(
self.User.access_token == self.test_token
).first()
self.assert_is_active(self.searched_user)
def test_integer(self):
self.searched_user = self.session.query(self.User).filter(
self.User.access_token == self.test_token
).first()
self.assert_accounts_num(self.searched_user)
class TestFernetEnryptedTypeTestCase(EncryptedTypeTestCase):
encryption_engine = FernetEngine
def test_unicode(self):
self.searched_user = self.session.query(self.User).filter(
self.User.id == self.user.id
).first()
self.assert_username(self.searched_user)
def test_string(self):
self.searched_user = self.session.query(self.User).filter(
self.User.id == self.user.id
).first()
self.assert_access_token(self.searched_user)
def test_boolean(self):
self.searched_user = self.session.query(self.User).filter(
self.User.id == self.user.id
).first()
self.assert_is_active(self.searched_user)
def test_integer(self):
self.searched_user = self.session.query(self.User).filter(
self.User.id == self.user.id
).first()
self.assert_accounts_num(self.searched_user)