From 06faf61523537e76ff25dcfaadd2eb3005f19385 Mon Sep 17 00:00:00 2001 From: Douglas Mendizabal Date: Fri, 17 May 2013 17:23:27 -0500 Subject: [PATCH] Added aes-128-cbc encryption in plugin using pycrypto. --- barbican/crypto/plugin.py | 64 +++++++++++++++++++----- barbican/tests/crypto/test_plugin.py | 74 +++++++++++++++++++++++++++- debian/barbican-common.postinst | 1 + tools/pip-requires | 1 + 4 files changed, 128 insertions(+), 12 deletions(-) diff --git a/barbican/crypto/plugin.py b/barbican/crypto/plugin.py index 4f69923f0..dd2b311f3 100644 --- a/barbican/crypto/plugin.py +++ b/barbican/crypto/plugin.py @@ -15,7 +15,11 @@ import abc +from Crypto.Cipher import AES +from Crypto import Random + from barbican.model.models import EncryptedDatum +from barbican.openstack.common import jsonutils as json class CryptoPluginBase(object): @@ -45,24 +49,62 @@ class CryptoPluginBase(object): class SimpleCryptoPlugin(CryptoPluginBase): """Insecure implementation of the crypto plugin.""" - #TODO: Use PyCrypto to aes encode secrets - def __init__(self): - self.supported_types = ['text/plain', 'application/octet-stream'] + self.supported_types = ['text/plain', 'application/octet-stream', + 'application/aes-128-cbc'] + self.kek = u'sixteen_byte_key' + self.block_size = 16 + + def _pad(self, unencrypted): + try: + unencrypted_bytes = unencrypted.encode('utf-8') + except UnicodeDecodeError: + unencrypted_bytes = unencrypted + pad_length = self.block_size - ( + len(unencrypted_bytes) % self.block_size + ) + return unencrypted_bytes + (chr(pad_length) * pad_length) + + def _strip_pad(self, unencrypted): + try: + unencrypted_bytes = unencrypted.encode('utf-8') + except UnicodeDecodeError: + unencrypted_bytes = unencrypted + pad_length = ord(unencrypted_bytes[-1:]) + unpadded = unencrypted_bytes[:-pad_length] + try: + #TODO: maybe kek_metadata needs to be used to determine + # whether the unpadded byte stream is a utf-8 string or not? + unpadded = unpadded.decode('utf-8') + except UnicodeDecodeError: + pass + return unpadded def encrypt(self, unencrypted, secret, tenant): - encrypted_datum = EncryptedDatum(secret) - encrypted_datum.cypher_text = '[ENcrypt this:{0}]'.format(unencrypted) - encrypted_datum.kek_metadata = "kek_metadata here" - return encrypted_datum + padded_data = self._pad(unencrypted) + iv = Random.get_random_bytes(16) + encryptor = AES.new(self.kek, AES.MODE_CBC, iv) + cyphertext = iv + encryptor.encrypt(padded_data) + + datum = EncryptedDatum() + datum.cypher_text = cyphertext + datum.mime_type = 'application/aes-128-cbc' + datum.kek_metadata = json.dumps({ + 'plugin': 'SimpleCryptoPlugin', + 'kek': 'kek_id' + }) + return datum def decrypt(self, secret_type, secret, tenant): - for encrypted_datum in secret.encrypted_data: - if secret_type == encrypted_datum.mime_type: - return '[DEcrypt this:{0}]'.format(encrypted_datum.cypher_text) - return None + payload = secret.encrypted_data.cypher_text + iv = payload[:16] + cypher_text = payload[16:] + decryptor = AES.new(self.kek, AES.MODE_CBC, iv) + padded_secret = decryptor.decrypt(cypher_text) + return self._strip_pad(padded_secret) def create(self, secret_type): + # TODO: return "insecure_key" def supports(self, secret_type): diff --git a/barbican/tests/crypto/test_plugin.py b/barbican/tests/crypto/test_plugin.py index 35c3152cf..b63b8ed05 100644 --- a/barbican/tests/crypto/test_plugin.py +++ b/barbican/tests/crypto/test_plugin.py @@ -13,7 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from barbican.crypto.plugin import CryptoPluginBase +from Crypto import Random +from mock import MagicMock +import unittest + +from barbican.crypto.plugin import CryptoPluginBase, SimpleCryptoPlugin from barbican.model.models import EncryptedDatum from barbican.openstack.common import jsonutils as json @@ -36,3 +40,71 @@ class TestCryptoPlugin(CryptoPluginBase): def supports(self, secret_type): return secret_type == 'text/plain' + + +class WhenTestingSimpleCryptoPlugin(unittest.TestCase): + + def setUp(self): + self.plugin = SimpleCryptoPlugin() + + def test_pad_binary_string(self): + binary_string = b'some_binary_string' + padded_string = ( + b'some_binary_string' + + b'\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e' + ) + self.assertEqual(self.plugin._pad(binary_string), padded_string) + + def test_pad_unicode_string(self): + unicode_beer = u'\U0001F37A' + padded_beer = (b'\xf0\x9f\x8d\xba' + + b'\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c') + self.assertEqual(self.plugin._pad(unicode_beer), padded_beer) + + def test_pad_random_bytes(self): + random_bytes = Random.get_random_bytes(10) + padded_bytes = random_bytes + b'\x06\x06\x06\x06\x06\x06' + self.assertEqual(self.plugin._pad(random_bytes), padded_bytes) + + def test_strip_padding_from_binary_string(self): + binary_string = b'some_binary_string' + padded_string = ( + b'some_binary_string' + + b'\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e' + ) + self.assertEqual(self.plugin._strip_pad(padded_string), binary_string) + + def test_strip_padding_from_unicode_string(self): + unicode_beer = u'\U0001F37A' + padded_beer = (b'\xf0\x9f\x8d\xba' + + b'\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c') + self.assertEqual(self.plugin._strip_pad(padded_beer), unicode_beer) + + def test_strip_padding_from_random_bytes(self): + random_bytes = Random.get_random_bytes(10) + padded_bytes = random_bytes + b'\x06\x06\x06\x06\x06\x06' + self.assertEqual(self.plugin._strip_pad(padded_bytes), random_bytes) + + def test_byte_string_encryption(self): + unencrypted = 'some_secret' + secret = MagicMock() + encrypted = self.plugin.encrypt(unencrypted, MagicMock(), MagicMock()) + secret.encrypted_data = encrypted + decrypted = self.plugin.decrypt('some_type', secret, MagicMock()) + self.assertEqual(unencrypted, decrypted) + + def test_unicode_string_encryption(self): + unencrypted = u'beer\U0001F37A' + secret = MagicMock() + encrypted = self.plugin.encrypt(unencrypted, MagicMock(), MagicMock()) + secret.encrypted_data = encrypted + decrypted = self.plugin.decrypt('some_type', secret, MagicMock()) + self.assertEqual(unencrypted, decrypted) + + def test_random_bytes_encryption(self): + unencrypted = Random.get_random_bytes(10) + secret = MagicMock() + encrypted = self.plugin.encrypt(unencrypted, MagicMock(), MagicMock()) + secret.encrypted_data = encrypted + decrypted = self.plugin.decrypt('some_type', secret, MagicMock()) + self.assertEqual(unencrypted, decrypted) diff --git a/debian/barbican-common.postinst b/debian/barbican-common.postinst index d189fb18b..0f09f123a 100644 --- a/debian/barbican-common.postinst +++ b/debian/barbican-common.postinst @@ -26,6 +26,7 @@ then pip install webob pip install PasteDeploy pip install stevedore + pip install pycrypto fi #DEBHELPER# diff --git a/tools/pip-requires b/tools/pip-requires index c8b8c8d13..1910fd74c 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -11,6 +11,7 @@ PasteDeploy>=1.5.0 Celery>=3.0.19 python-keystoneclient>=0.2.0 stevedore>=0.8 +pycrypto>=2.6 # SQLAlchemy 0.7.10 typically has issues installing via pip, since it # will be removed as a dependency soon we will just grab the tarball