Added aes-128-cbc encryption in plugin using pycrypto.

This commit is contained in:
Douglas Mendizabal
2013-05-17 17:23:27 -05:00
parent 663a8b05da
commit 06faf61523
4 changed files with 128 additions and 12 deletions

View File

@@ -15,7 +15,11 @@
import abc
from Crypto.Cipher import AES
from Crypto import Random
from barbican.model.models import EncryptedDatum
from barbican.openstack.common import jsonutils as json
class CryptoPluginBase(object):
@@ -45,24 +49,62 @@ class CryptoPluginBase(object):
class SimpleCryptoPlugin(CryptoPluginBase):
"""Insecure implementation of the crypto plugin."""
#TODO: Use PyCrypto to aes encode secrets
def __init__(self):
self.supported_types = ['text/plain', 'application/octet-stream']
self.supported_types = ['text/plain', 'application/octet-stream',
'application/aes-128-cbc']
self.kek = u'sixteen_byte_key'
self.block_size = 16
def _pad(self, unencrypted):
try:
unencrypted_bytes = unencrypted.encode('utf-8')
except UnicodeDecodeError:
unencrypted_bytes = unencrypted
pad_length = self.block_size - (
len(unencrypted_bytes) % self.block_size
)
return unencrypted_bytes + (chr(pad_length) * pad_length)
def _strip_pad(self, unencrypted):
try:
unencrypted_bytes = unencrypted.encode('utf-8')
except UnicodeDecodeError:
unencrypted_bytes = unencrypted
pad_length = ord(unencrypted_bytes[-1:])
unpadded = unencrypted_bytes[:-pad_length]
try:
#TODO: maybe kek_metadata needs to be used to determine
# whether the unpadded byte stream is a utf-8 string or not?
unpadded = unpadded.decode('utf-8')
except UnicodeDecodeError:
pass
return unpadded
def encrypt(self, unencrypted, secret, tenant):
encrypted_datum = EncryptedDatum(secret)
encrypted_datum.cypher_text = '[ENcrypt this:{0}]'.format(unencrypted)
encrypted_datum.kek_metadata = "kek_metadata here"
return encrypted_datum
padded_data = self._pad(unencrypted)
iv = Random.get_random_bytes(16)
encryptor = AES.new(self.kek, AES.MODE_CBC, iv)
cyphertext = iv + encryptor.encrypt(padded_data)
datum = EncryptedDatum()
datum.cypher_text = cyphertext
datum.mime_type = 'application/aes-128-cbc'
datum.kek_metadata = json.dumps({
'plugin': 'SimpleCryptoPlugin',
'kek': 'kek_id'
})
return datum
def decrypt(self, secret_type, secret, tenant):
for encrypted_datum in secret.encrypted_data:
if secret_type == encrypted_datum.mime_type:
return '[DEcrypt this:{0}]'.format(encrypted_datum.cypher_text)
return None
payload = secret.encrypted_data.cypher_text
iv = payload[:16]
cypher_text = payload[16:]
decryptor = AES.new(self.kek, AES.MODE_CBC, iv)
padded_secret = decryptor.decrypt(cypher_text)
return self._strip_pad(padded_secret)
def create(self, secret_type):
# TODO:
return "insecure_key"
def supports(self, secret_type):

View File

@@ -13,7 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from barbican.crypto.plugin import CryptoPluginBase
from Crypto import Random
from mock import MagicMock
import unittest
from barbican.crypto.plugin import CryptoPluginBase, SimpleCryptoPlugin
from barbican.model.models import EncryptedDatum
from barbican.openstack.common import jsonutils as json
@@ -36,3 +40,71 @@ class TestCryptoPlugin(CryptoPluginBase):
def supports(self, secret_type):
return secret_type == 'text/plain'
class WhenTestingSimpleCryptoPlugin(unittest.TestCase):
def setUp(self):
self.plugin = SimpleCryptoPlugin()
def test_pad_binary_string(self):
binary_string = b'some_binary_string'
padded_string = (
b'some_binary_string' +
b'\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e'
)
self.assertEqual(self.plugin._pad(binary_string), padded_string)
def test_pad_unicode_string(self):
unicode_beer = u'\U0001F37A'
padded_beer = (b'\xf0\x9f\x8d\xba' +
b'\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c')
self.assertEqual(self.plugin._pad(unicode_beer), padded_beer)
def test_pad_random_bytes(self):
random_bytes = Random.get_random_bytes(10)
padded_bytes = random_bytes + b'\x06\x06\x06\x06\x06\x06'
self.assertEqual(self.plugin._pad(random_bytes), padded_bytes)
def test_strip_padding_from_binary_string(self):
binary_string = b'some_binary_string'
padded_string = (
b'some_binary_string' +
b'\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e\x0e'
)
self.assertEqual(self.plugin._strip_pad(padded_string), binary_string)
def test_strip_padding_from_unicode_string(self):
unicode_beer = u'\U0001F37A'
padded_beer = (b'\xf0\x9f\x8d\xba' +
b'\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c\x0c')
self.assertEqual(self.plugin._strip_pad(padded_beer), unicode_beer)
def test_strip_padding_from_random_bytes(self):
random_bytes = Random.get_random_bytes(10)
padded_bytes = random_bytes + b'\x06\x06\x06\x06\x06\x06'
self.assertEqual(self.plugin._strip_pad(padded_bytes), random_bytes)
def test_byte_string_encryption(self):
unencrypted = 'some_secret'
secret = MagicMock()
encrypted = self.plugin.encrypt(unencrypted, MagicMock(), MagicMock())
secret.encrypted_data = encrypted
decrypted = self.plugin.decrypt('some_type', secret, MagicMock())
self.assertEqual(unencrypted, decrypted)
def test_unicode_string_encryption(self):
unencrypted = u'beer\U0001F37A'
secret = MagicMock()
encrypted = self.plugin.encrypt(unencrypted, MagicMock(), MagicMock())
secret.encrypted_data = encrypted
decrypted = self.plugin.decrypt('some_type', secret, MagicMock())
self.assertEqual(unencrypted, decrypted)
def test_random_bytes_encryption(self):
unencrypted = Random.get_random_bytes(10)
secret = MagicMock()
encrypted = self.plugin.encrypt(unencrypted, MagicMock(), MagicMock())
secret.encrypted_data = encrypted
decrypted = self.plugin.decrypt('some_type', secret, MagicMock())
self.assertEqual(unencrypted, decrypted)

View File

@@ -26,6 +26,7 @@ then
pip install webob
pip install PasteDeploy
pip install stevedore
pip install pycrypto
fi
#DEBHELPER#

View File

@@ -11,6 +11,7 @@ PasteDeploy>=1.5.0
Celery>=3.0.19
python-keystoneclient>=0.2.0
stevedore>=0.8
pycrypto>=2.6
# SQLAlchemy 0.7.10 typically has issues installing via pip, since it
# will be removed as a dependency soon we will just grab the tarball