Merge "Port crypto_utils to Python 3"
This commit is contained in:
2
tox.ini
2
tox.ini
@@ -31,7 +31,9 @@ commands =
|
||||
# all unit tests will pass on Python 3.
|
||||
commands =
|
||||
python -bb -m testtools.run \
|
||||
trove/tests/unittests/common/test_common_extensions.py \
|
||||
trove/tests/unittests/common/test_context.py \
|
||||
trove/tests/unittests/common/test_crypto_utils.py \
|
||||
trove/tests/unittests/common/test_exception.py \
|
||||
trove/tests/unittests/common/test_notification.py \
|
||||
trove/tests/unittests/common/test_remote.py \
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto import Random
|
||||
import hashlib
|
||||
from oslo_utils import encodeutils
|
||||
import six
|
||||
|
||||
from trove.common import stream_codecs
|
||||
|
||||
@@ -27,6 +29,8 @@ IV_BIT_COUNT = 16
|
||||
|
||||
|
||||
def encode_data(data):
|
||||
if isinstance(data, six.text_type):
|
||||
data = data.encode('utf-8')
|
||||
return stream_codecs.Base64Codec().serialize(data)
|
||||
|
||||
|
||||
@@ -37,17 +41,20 @@ def decode_data(data):
|
||||
# Pad the data string to an multiple of pad_size
|
||||
def pad_for_encryption(data, pad_size=IV_BIT_COUNT):
|
||||
pad_count = pad_size - (len(data) % pad_size)
|
||||
return data + chr(pad_count) * pad_count
|
||||
return data + six.int2byte(pad_count) * pad_count
|
||||
|
||||
|
||||
# Unpad the data string by stripping off excess characters
|
||||
def unpad_after_decryption(data):
|
||||
return data[:len(data) - ord(data[-1])]
|
||||
return data[:len(data) - six.indexbytes(data, -1)]
|
||||
|
||||
|
||||
def encrypt_data(data, key, iv_bit_count=IV_BIT_COUNT):
|
||||
data = encodeutils.to_utf8(data)
|
||||
key = encodeutils.to_utf8(key)
|
||||
md5_key = hashlib.md5(key).hexdigest()
|
||||
iv = Random.new().read(iv_bit_count)[:iv_bit_count]
|
||||
iv = Random.new().read(iv_bit_count)
|
||||
iv = iv[:iv_bit_count]
|
||||
aes = AES.new(md5_key, AES.MODE_CBC, iv)
|
||||
data = pad_for_encryption(data, iv_bit_count)
|
||||
encrypted = aes.encrypt(data)
|
||||
@@ -55,6 +62,7 @@ def encrypt_data(data, key, iv_bit_count=IV_BIT_COUNT):
|
||||
|
||||
|
||||
def decrypt_data(data, key, iv_bit_count=IV_BIT_COUNT):
|
||||
key = encodeutils.to_utf8(key)
|
||||
md5_key = hashlib.md5(key).hexdigest()
|
||||
iv = data[:iv_bit_count]
|
||||
aes = AES.new(md5_key, AES.MODE_CBC, bytes(iv))
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
#
|
||||
|
||||
from Crypto import Random
|
||||
import mock
|
||||
import six
|
||||
|
||||
from trove.common import crypto_utils
|
||||
from trove.tests.unittests import trove_testtools
|
||||
@@ -30,17 +32,20 @@ class TestEncryptUtils(trove_testtools.TestCase):
|
||||
|
||||
def test_encode_decode_string(self):
|
||||
random_data = bytearray(Random.new().read(12))
|
||||
data = ['abc', 'numbers01234', '\x00\xFF\x00\xFF\xFF\x00', random_data]
|
||||
data = [b'abc', b'numbers01234', b'\x00\xFF\x00\xFF\xFF\x00',
|
||||
random_data, u'Unicode:\u20ac']
|
||||
|
||||
for datum in data:
|
||||
encoded_data = crypto_utils.encode_data(datum)
|
||||
decoded_data = crypto_utils.decode_data(encoded_data)
|
||||
if isinstance(datum, six.text_type):
|
||||
decoded_data = decoded_data.decode('utf-8')
|
||||
self. assertEqual(datum, decoded_data,
|
||||
"Encode/decode failed")
|
||||
|
||||
def test_pad_unpad(self):
|
||||
for size in range(1, 100):
|
||||
data_str = 'a' * size
|
||||
data_str = b'a' * size
|
||||
padded_str = crypto_utils.pad_for_encryption(
|
||||
data_str, crypto_utils.IV_BIT_COUNT)
|
||||
self.assertEqual(0, len(padded_str) % crypto_utils.IV_BIT_COUNT,
|
||||
@@ -62,3 +67,46 @@ class TestEncryptUtils(trove_testtools.TestCase):
|
||||
|
||||
self.assertEqual(orig_data, final_decoded,
|
||||
"Decrypted data did not match original")
|
||||
|
||||
def test_encrypt(self):
|
||||
# test encrypt() with an hardcoded IV
|
||||
key = 'my_secure_key'
|
||||
salt = b'x' * crypto_utils.IV_BIT_COUNT
|
||||
|
||||
with mock.patch('Crypto.Random.new') as mock_random:
|
||||
mock_random.return_value.read.return_value = salt
|
||||
|
||||
for orig_data, expected in (
|
||||
# byte string
|
||||
(b'Hello World!',
|
||||
'eHh4eHh4eHh4eHh4eHh4eF5RK6VdDrAWl4Th1mNG2eps+VB2BouFRiY2Wa'
|
||||
'P/RRPT'),
|
||||
|
||||
# Unicoded string (encoded to UTF-8)
|
||||
(u'Unicode:\u20ac',
|
||||
'eHh4eHh4eHh4eHh4eHh4eAMsI5YsrtMNAPJfVF0j9NegXML7OsJ0LuAy66'
|
||||
'LKv5F4'),
|
||||
):
|
||||
orig_encoded = crypto_utils.encode_data(orig_data)
|
||||
encrypted = crypto_utils.encrypt_data(orig_encoded, key)
|
||||
encoded = crypto_utils.encode_data(encrypted)
|
||||
self.assertEqual(expected, encoded)
|
||||
|
||||
def test_decrypt(self):
|
||||
key = 'my_secure_key'
|
||||
|
||||
for encoded, expected in (
|
||||
# byte string: b'Hello World!'
|
||||
('ZUhoNGVIaDRlSGg0ZUhoNL9PmM70hVcQ7j/kYF7Pw+BT7VSfsht0VsCIxy'
|
||||
'KNN0NH',
|
||||
b'Hello World!'),
|
||||
|
||||
# Unicoded string: u'Unicode:\u20ac'
|
||||
('ZUhoNGVIaDRlSGg0ZUhoNIHZLIuIcQCRwWY7PR2y7JcqoDf4ViqXIfh0uE'
|
||||
'Rbg9BA',
|
||||
b'Unicode:\xe2\x82\xac'),
|
||||
):
|
||||
decoded = crypto_utils.decode_data(encoded)
|
||||
decrypted = crypto_utils.decrypt_data(decoded, key)
|
||||
final_decoded = crypto_utils.decode_data(decrypted)
|
||||
self.assertEqual(expected, final_decoded)
|
||||
|
||||
Reference in New Issue
Block a user