Update tests, docs, and changelog

This commit is contained in:
Ryan Leckey
2014-12-11 13:45:02 -08:00
parent f6a5189673
commit 65694782ac
3 changed files with 174 additions and 101 deletions

View File

@@ -8,7 +8,8 @@ Here you can see the full list of changes between each SQLAlchemy-Utils release.
^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^
- Fixed PhoneNumber string coercion (#93) - Fixed PhoneNumber string coercion (#93)
- Improved EncryptedType to support more underlying_type's; now supports: Integer, Boolean, Date, Time, DateTime, ColorType, PhoneNumberType, Unicode(Text), String(Text), Enum
- Allow a callable to be used to lookup the key for EncryptedType
0.27.11 (2014-12-06) 0.27.11 (2014-12-06)
^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^

View File

@@ -181,6 +181,20 @@ class EncryptedType(TypeDecorator, ScalarCoercible):
Base.metadata.drop_all(connection) Base.metadata.drop_all(connection)
connection.close() connection.close()
engine.dispose() engine.dispose()
The key parameter accepts a callable to allow for the key to change
per-row instead of be fixed for the whole table.
::
def get_key():
return "dynamic-key"
class User(Base):
__tablename__ = "user"
id = sa.Column(sa.Integer, primary_key=True)
username = sa.Column(EncryptedType(
sa.Unicode, get_key))
""" """
impl = Binary impl = Binary
@@ -195,7 +209,9 @@ class EncryptedType(TypeDecorator, ScalarCoercible):
# set the underlying type # set the underlying type
if type_in is None: if type_in is None:
type_in = String() type_in = String()
self.underlying_type = type_in() elif isinstance(type_in, type):
type_in = type_in()
self.underlying_type = type_in
self._key = key self._key = key
if not engine: if not engine:
engine = AesEngine engine = AesEngine
@@ -253,6 +269,10 @@ class EncryptedType(TypeDecorator, ScalarCoercible):
if issubclass(type_, bool): if issubclass(type_, bool):
return decrypted_value == "true" return decrypted_value == "true"
elif issubclass(type_, datetime.datetime):
return datetime.datetime.strptime(
decrypted_value, "%Y-%m-%dT%H:%M:%S")
elif issubclass(type_, datetime.time): elif issubclass(type_, datetime.time):
return datetime.datetime.strptime( return datetime.datetime.strptime(
decrypted_value, "%H:%M:%S").time() decrypted_value, "%H:%M:%S").time()
@@ -261,10 +281,6 @@ class EncryptedType(TypeDecorator, ScalarCoercible):
return datetime.datetime.strptime( return datetime.datetime.strptime(
decrypted_value, "%Y-%m-%d").date() decrypted_value, "%Y-%m-%d").date()
elif issubclass(type_, datetime.datetime):
return datetime.datetime.strptime(
decrypted_value, "%Y-%m-%dT%H:%M:%S")
# Handle all others # Handle all others
return self.underlying_type.python_type(decrypted_value) return self.underlying_type.python_type(decrypted_value)

View File

@@ -1,4 +1,6 @@
import sqlalchemy as sa import sqlalchemy as sa
from datetime import datetime, date, time
import pytest
from pytest import mark from pytest import mark
cryptography = None cryptography = None
try: try:
@@ -7,42 +9,67 @@ except ImportError:
pass pass
from tests import TestCase from tests import TestCase
from sqlalchemy_utils import EncryptedType from sqlalchemy_utils import EncryptedType, PhoneNumberType, ColorType
from sqlalchemy_utils.types.encrypted import AesEngine, FernetEngine from sqlalchemy_utils.types.encrypted import AesEngine, FernetEngine
@mark.skipif('cryptography is None') @mark.skipif('cryptography is None')
class EncryptedTypeTestCase(TestCase): class EncryptedTypeTestCase(TestCase):
def setup_method(self, method):
# set some test values @pytest.fixture(scope="function")
self.test_key = 'secretkey1234' def user(self, request):
self.user_name = u'someone'
self.test_token = self.generate_test_token()
self.active = True
self.accounts_num = 2
self.searched_user = None
super(EncryptedTypeTestCase, self).setup_method(method)
# set the values to the user object # set the values to the user object
self.user = self.User() self.user = self.User()
self.user.username = self.user_name self.user.username = self.user_name
self.user.phone = self.user_phone
self.user.color = self.user_color
self.user.date = self.user_date
self.user.time = self.user_time
self.user.enum = self.user_enum
self.user.datetime = self.user_datetime
self.user.access_token = self.test_token self.user.access_token = self.test_token
self.user.is_active = self.active self.user.is_active = self.active
self.user.accounts_num = self.accounts_num self.user.accounts_num = self.accounts_num
self.session.add(self.user) self.session.add(self.user)
self.session.commit() self.session.commit()
def teardown_method(self, method): # register a finalizer to cleanup
self.session.delete(self.user) def finalize():
self.session.commit() del self.user_name
del self.user_name del self.test_token
del self.test_token del self.active
del self.active del self.accounts_num
del self.accounts_num del self.test_key
del self.test_key del self.searched_user
del self.searched_user
super(EncryptedTypeTestCase, self).teardown_method(method) request.addfinalizer(finalize)
return self.session.query(self.User).get(self.user.id)
def generate_test_token(self):
import string
import random
token = ""
characters = string.ascii_letters + string.digits
for i in range(60):
token += ''.join(random.choice(characters))
return token
def create_models(self): def create_models(self):
# set some test values
self.test_key = 'secretkey1234'
self.user_name = u'someone'
self.user_phone = u'(555) 555-5555'
self.user_color = u'#fff'
self.user_enum = "One"
self.user_date = date(2010, 10, 2)
self.user_time = time(10, 12)
self.user_datetime = datetime(2010, 10, 2, 10, 12)
self.test_token = self.generate_test_token()
self.active = True
self.accounts_num = 2
self.searched_user = None
class User(self.Base): class User(self.Base):
__tablename__ = 'user' __tablename__ = 'user'
id = sa.Column(sa.Integer, primary_key=True) id = sa.Column(sa.Integer, primary_key=True)
@@ -62,96 +89,125 @@ class EncryptedTypeTestCase(TestCase):
sa.Integer, sa.Integer,
self.test_key, self.test_key,
self.__class__.encryption_engine)) self.__class__.encryption_engine))
phone = sa.Column(EncryptedType(
def __repr__(self): PhoneNumberType,
return ( self.test_key,
"User(id={}, username={}, access_token={}," self.__class__.encryption_engine))
"active={}, accounts={})".format( color = sa.Column(EncryptedType(
self.id, ColorType,
self.username, self.test_key,
self.access_token, self.__class__.encryption_engine))
self.is_active, date = sa.Column(EncryptedType(
self.accounts_num sa.Date,
) self.test_key,
) self.__class__.encryption_engine))
time = sa.Column(EncryptedType(
sa.Time,
self.test_key,
self.__class__.encryption_engine))
datetime = sa.Column(EncryptedType(
sa.DateTime,
self.test_key,
self.__class__.encryption_engine))
enum = sa.Column(EncryptedType(
sa.Enum("One", name="user_enum_t"),
self.test_key,
self.__class__.encryption_engine))
self.User = User self.User = User
def assert_username(self, _user): class Team(self.Base):
assert _user.username == self.user_name __tablename__ = 'team'
id = sa.Column(sa.Integer, primary_key=True)
key = sa.Column(sa.Unicode(50))
name = sa.Column(EncryptedType(
sa.Unicode,
lambda: self._team_key,
self.__class__.encryption_engine))
def assert_access_token(self, _user): self.Team = Team
assert _user.access_token == self.test_token
def assert_is_active(self, _user): def test_unicode(self, user):
assert _user.is_active == self.active assert user.username == self.user_name
def assert_accounts_num(self, _user): def test_string(self, user):
assert _user.accounts_num == self.accounts_num assert user.access_token == self.test_token
def generate_test_token(self): def test_boolean(self, user):
import string assert user.is_active == self.active
import random
token = "" def test_integer(self, user):
characters = string.ascii_letters + string.digits assert user.accounts_num == self.accounts_num
for i in range(60):
token += ''.join(random.choice(characters)) def test_phone_number(self, user):
return token assert str(user.phone) == self.user_phone
def test_color(self, user):
assert user.color.hex == self.user_color
def test_date(self, user):
assert user.date == self.user_date
def test_datetime(self, user):
assert user.datetime == self.user_datetime
def test_time(self, user):
assert user.time == self.user_time
def test_enum(self, user):
assert user.enum == self.user_enum
def test_lookup_key(self):
# Add teams
self._team_key = "one"
team = self.Team(key=self._team_key, name="One")
self.session.add(team)
self.session.flush()
team_1_id = team.id
self._team_key = "two"
team = self.Team(key=self._team_key, name="Two")
self.session.add(team)
self.session.flush()
team_2_id = team.id
self.session.commit()
# Lookup teams
self._team_key = self.session.query(self.Team.key).filter_by(
id=team_1_id).one()[0]
team = self.session.query(self.Team).get(team_1_id)
assert team.name == "One"
with pytest.raises(Exception):
self.session.query(self.Team).get(team_2_id)
self._team_key = self.session.query(self.Team.key).filter_by(
id=team_2_id).one()[0]
team = self.session.query(self.Team).get(team_2_id)
assert team.name == "Two"
with pytest.raises(Exception):
self.session.query(self.Team).get(team_1_id)
# Remove teams
self.session.query(self.Team).delete()
self.session.commit()
class TestAesEncryptedTypeTestcase(EncryptedTypeTestCase): class TestAesEncryptedTypeTestcase(EncryptedTypeTestCase):
encryption_engine = AesEngine encryption_engine = AesEngine
def test_unicode(self): def test_lookup_by_encrypted_string(self, user):
self.searched_user = self.session.query(self.User).filter( test = self.session.query(self.User).filter(
self.User.access_token == self.test_token self.User.username == self.user_name).first()
).first()
self.assert_username(self.searched_user)
def test_string(self): assert test.username == user.username
self.searched_user = self.session.query(self.User).filter(
self.User.username == self.user_name
).first()
self.assert_access_token(self.searched_user)
def test_boolean(self):
self.searched_user = self.session.query(self.User).filter(
self.User.access_token == self.test_token
).first()
self.assert_is_active(self.searched_user)
def test_integer(self):
self.searched_user = self.session.query(self.User).filter(
self.User.access_token == self.test_token
).first()
self.assert_accounts_num(self.searched_user)
class TestFernetEnryptedTypeTestCase(EncryptedTypeTestCase): class TestFernetEncryptedTypeTestCase(EncryptedTypeTestCase):
encryption_engine = FernetEngine encryption_engine = FernetEngine
def test_unicode(self):
self.searched_user = self.session.query(self.User).filter(
self.User.id == self.user.id
).first()
self.assert_username(self.searched_user)
def test_string(self):
self.searched_user = self.session.query(self.User).filter(
self.User.id == self.user.id
).first()
self.assert_access_token(self.searched_user)
def test_boolean(self):
self.searched_user = self.session.query(self.User).filter(
self.User.id == self.user.id
).first()
self.assert_is_active(self.searched_user)
def test_integer(self):
self.searched_user = self.session.query(self.User).filter(
self.User.id == self.user.id
).first()
self.assert_accounts_num(self.searched_user)