Remove EncryptedDatum from plugin.
This commit is contained in:
@@ -17,6 +17,7 @@ from oslo.config import cfg
|
||||
from stevedore import named
|
||||
|
||||
from barbican.common.exception import BarbicanException
|
||||
from barbican.model.models import EncryptedDatum
|
||||
from barbican.openstack.common.gettextutils import _
|
||||
|
||||
|
||||
@@ -84,9 +85,16 @@ class CryptoExtensionManager(named.NamedExtensionManager):
|
||||
|
||||
def encrypt(self, unencrypted, secret, tenant):
|
||||
"""Delegates encryption to active plugins."""
|
||||
if secret.mime_type == 'text/plain':
|
||||
unencrypted = unencrypted.encode('utf-8')
|
||||
|
||||
for ext in self.extensions:
|
||||
if ext.obj.supports(secret.mime_type):
|
||||
return ext.obj.encrypt(unencrypted, secret, tenant)
|
||||
datum = EncryptedDatum(secret)
|
||||
datum.cypher_text, datum.kek_metadata = ext.obj.encrypt(
|
||||
unencrypted, tenant
|
||||
)
|
||||
return datum
|
||||
else:
|
||||
raise CryptoMimeTypeNotSupportedException(secret.mime_type)
|
||||
|
||||
@@ -100,7 +108,12 @@ class CryptoExtensionManager(named.NamedExtensionManager):
|
||||
if ext.obj.supports(accept):
|
||||
for datum in secret.encrypted_data:
|
||||
if accept == datum.mime_type:
|
||||
return ext.obj.decrypt(datum, tenant)
|
||||
unencrypted = ext.obj.decrypt(datum.cypher_text,
|
||||
datum.kek_metadata,
|
||||
tenant)
|
||||
if accept == 'text/plain':
|
||||
unencrypted = unencrypted.decode('utf-8')
|
||||
return unencrypted
|
||||
else:
|
||||
raise CryptoAcceptNotSupportedException(accept)
|
||||
|
||||
@@ -120,7 +133,11 @@ class CryptoExtensionManager(named.NamedExtensionManager):
|
||||
# secret algo type) uses a different plug in than that
|
||||
# used to encrypted the key.
|
||||
data_key = ext.obj.create(secret.mime_type)
|
||||
return ext.obj.encrypt(data_key, secret, tenant)
|
||||
datum = EncryptedDatum(secret)
|
||||
datum.cypher_text, datum.kek_metadata = ext.obj.encrypt(
|
||||
data_key, tenant
|
||||
)
|
||||
return datum
|
||||
else:
|
||||
raise CryptoMimeTypeNotSupportedException(secret.mime_type)
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ import abc
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto import Random
|
||||
|
||||
from barbican.model.models import EncryptedDatum
|
||||
from barbican.openstack.common import jsonutils as json
|
||||
|
||||
|
||||
@@ -28,25 +27,26 @@ class CryptoPluginBase(object):
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def encrypt(self, unencrypted, secret, tenant):
|
||||
"""Encrypt unencrypted data in the context of the provided
|
||||
secret and tenant.
|
||||
def encrypt(self, unencrypted, tenant):
|
||||
"""Encrypt unencrypted data in the context of the provided tenant.
|
||||
|
||||
:param unencrypted: byte data to be encrypted.
|
||||
:param secret: Secret associated with the unencrypted data.
|
||||
:param tenant: Tenant associated with the unencrypted data.
|
||||
:returns: EncryptedDatum containing the encrypted data.
|
||||
:raises: ValueError if unencrypted is not byte data
|
||||
|
||||
:returns: tuple -- contains the encrypted data and kek metadata.
|
||||
:raises: ValueError if unencrypted is not byte data.
|
||||
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def decrypt(self, encrypted_datum, tenant):
|
||||
def decrypt(self, encrypted, kek_metadata, tenant):
|
||||
"""Decrypt encrypted_datum in the context of the provided tenant.
|
||||
|
||||
:param encrypted_datum: EncryptedDatum object to be decrypted.
|
||||
:param encrypted: cyphertext to be decrypted.
|
||||
:param kek_metadata: metadata that was created by encryption.
|
||||
:param tenant: Tenant associated with the encrypted datum.
|
||||
:returns str -- unencrypted byte data
|
||||
|
||||
:returns: str -- unencrypted byte data
|
||||
|
||||
"""
|
||||
|
||||
@@ -79,27 +79,25 @@ class SimpleCryptoPlugin(CryptoPluginBase):
|
||||
unpadded = unencrypted[:-pad_length]
|
||||
return unpadded
|
||||
|
||||
def encrypt(self, unencrypted, secret, tenant):
|
||||
def encrypt(self, unencrypted, tenant):
|
||||
if not isinstance(unencrypted, str):
|
||||
raise ValueError('unencrypted data must be a byte type.')
|
||||
padded_data = self._pad(unencrypted)
|
||||
iv = Random.get_random_bytes(self.block_size)
|
||||
encryptor = AES.new(self.kek, AES.MODE_CBC, iv)
|
||||
cyphertext = iv + encryptor.encrypt(padded_data)
|
||||
|
||||
datum = EncryptedDatum(secret)
|
||||
datum.cypher_text = cyphertext
|
||||
datum.kek_metadata = json.dumps({
|
||||
cyphertext = iv + encryptor.encrypt(padded_data)
|
||||
kek_metadata = json.dumps({
|
||||
'plugin': 'SimpleCryptoPlugin',
|
||||
'encryption': 'aes-128-cbc',
|
||||
'kek': 'kek_id'
|
||||
})
|
||||
return datum
|
||||
|
||||
def decrypt(self, encrypted_datum, tenant):
|
||||
payload = encrypted_datum.cypher_text
|
||||
iv = payload[:self.block_size]
|
||||
cypher_text = payload[self.block_size:]
|
||||
return cyphertext, kek_metadata
|
||||
|
||||
def decrypt(self, encrypted, kek_metadata, tenant):
|
||||
iv = encrypted[:self.block_size]
|
||||
cypher_text = encrypted[self.block_size:]
|
||||
decryptor = AES.new(self.kek, AES.MODE_CBC, iv)
|
||||
padded_secret = decryptor.decrypt(cypher_text)
|
||||
return self._strip_pad(padded_secret)
|
||||
|
||||
@@ -18,20 +18,18 @@ from mock import MagicMock
|
||||
import unittest
|
||||
|
||||
from barbican.crypto.plugin import CryptoPluginBase, SimpleCryptoPlugin
|
||||
from barbican.model.models import EncryptedDatum
|
||||
from barbican.openstack.common import jsonutils as json
|
||||
|
||||
|
||||
class TestCryptoPlugin(CryptoPluginBase):
|
||||
"""Crypto plugin implementation for testing the plugin manager."""
|
||||
|
||||
def encrypt(self, unencrypted, secret, tenant):
|
||||
datum = EncryptedDatum(secret)
|
||||
datum.cypher_text = 'cypher_text'
|
||||
datum.kek_metadata = json.dumps({'plugin': 'TestCryptoPlugin'})
|
||||
return datum
|
||||
def encrypt(self, unencrypted, tenant):
|
||||
cypher_text = 'cypher_text'
|
||||
kek_metadata = json.dumps({'plugin': 'TestCryptoPlugin'})
|
||||
return cypher_text, kek_metadata
|
||||
|
||||
def decrypt(self, encrypted_datum, tenant):
|
||||
def decrypt(self, encrypted, kek_metadata, tenant):
|
||||
return 'plain-data'
|
||||
|
||||
def create(self, secret_type):
|
||||
@@ -77,20 +75,16 @@ class WhenTestingSimpleCryptoPlugin(unittest.TestCase):
|
||||
secret = MagicMock()
|
||||
secret.mime_type = 'text/plain'
|
||||
with self.assertRaises(ValueError):
|
||||
self.plugin.encrypt(unencrypted, secret, MagicMock())
|
||||
self.plugin.encrypt(unencrypted, MagicMock())
|
||||
|
||||
def test_byte_string_encryption(self):
|
||||
unencrypted = b'some_secret'
|
||||
secret = MagicMock()
|
||||
secret.mime_type = 'text/plain'
|
||||
encrypted_datum = self.plugin.encrypt(unencrypted, secret, MagicMock())
|
||||
decrypted = self.plugin.decrypt(encrypted_datum, MagicMock())
|
||||
encrypted, kek_metadata = self.plugin.encrypt(unencrypted, MagicMock())
|
||||
decrypted = self.plugin.decrypt(encrypted, kek_metadata, MagicMock())
|
||||
self.assertEqual(unencrypted, decrypted)
|
||||
|
||||
def test_random_bytes_encryption(self):
|
||||
unencrypted = Random.get_random_bytes(10)
|
||||
secret = MagicMock()
|
||||
secret.mime_type = 'text/plain'
|
||||
encrypted_datum = self.plugin.encrypt(unencrypted, secret, MagicMock())
|
||||
decrypted = self.plugin.decrypt(encrypted_datum, MagicMock())
|
||||
encrypted, kek_metadata = self.plugin.encrypt(unencrypted, MagicMock())
|
||||
decrypted = self.plugin.decrypt(encrypted, kek_metadata, MagicMock())
|
||||
self.assertEqual(unencrypted, decrypted)
|
||||
|
||||
Reference in New Issue
Block a user