Extend crypto plugin to support more key types
This change is part of "api-orders-add-more-types" BP phase 1 plan. As per phase 1 we will extend the order resource to support asymmetric and more symmetric keys. This specific change is to address the crypto plug-in and extension manager. Partial-BP: api-orders-add-more-types Change-Id: Idf0073c0703dff652e55e811538f86a8f7c7259b
This commit is contained in:
parent
1a6e18eb45
commit
d864b87386
@ -71,10 +71,14 @@ def create_secret(data, tenant, crypto_manager,
|
||||
|
||||
elif ok_to_generate:
|
||||
LOG.debug('Generating new secret...')
|
||||
new_datum = crypto_manager.generate_data_encryption_key(new_secret,
|
||||
content_type,
|
||||
tenant,
|
||||
kek_repo)
|
||||
# TODO (atiwari): With new typed Order API proposal
|
||||
# we need to translate new_secret to meta
|
||||
# currently it is working as meta will have same attributes
|
||||
new_datum = crypto_manager. \
|
||||
generate_symmetric_encryption_key(new_secret,
|
||||
content_type,
|
||||
tenant,
|
||||
kek_repo)
|
||||
time_keeper.mark('after secret generate')
|
||||
|
||||
else:
|
||||
|
@ -26,6 +26,7 @@ from pki.kraclient import KRAClient
|
||||
|
||||
from barbican.crypto import plugin
|
||||
from barbican.openstack.common.gettextutils import _
|
||||
from barbican.common import exception
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
@ -56,6 +57,10 @@ CONF.register_group(dogtag_crypto_plugin_group)
|
||||
CONF.register_opts(dogtag_crypto_plugin_opts, group=dogtag_crypto_plugin_group)
|
||||
|
||||
|
||||
class DogtagPluginAlgorithmException(exception.BarbicanException):
|
||||
message = _("Invalid algorithm passed in")
|
||||
|
||||
|
||||
class DogtagCryptoPlugin(plugin.CryptoPluginBase):
|
||||
"""Dogtag implementation of the crypto plugin with DRM as the backend"""
|
||||
|
||||
@ -138,7 +143,7 @@ class DogtagCryptoPlugin(plugin.CryptoPluginBase):
|
||||
encrypt_dto.unencrypted,
|
||||
key_algorithm=None,
|
||||
key_size=None)
|
||||
return response.get_key_id(), None
|
||||
return plugin.ResponseDTO(response.get_key_id(), None)
|
||||
|
||||
def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
|
||||
keystone_id):
|
||||
@ -173,7 +178,7 @@ class DogtagCryptoPlugin(plugin.CryptoPluginBase):
|
||||
"""
|
||||
return kek_meta_dto
|
||||
|
||||
def generate(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
def generate_symmetric(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
"""
|
||||
Generate a symmetric key
|
||||
|
||||
@ -189,18 +194,59 @@ class DogtagCryptoPlugin(plugin.CryptoPluginBase):
|
||||
key.SymKeyGenerationRequest.ENCRYPT_USAGE]
|
||||
|
||||
client_key_id = uuid.uuid4().hex
|
||||
response = self.keyclient.generate_symmetric_key(
|
||||
client_key_id, generate_dto.algorithm.upper(),
|
||||
generate_dto.bit_length, usages)
|
||||
return response.get_key_id(), None
|
||||
algorithm = self._map_algorithm(generate_dto.algorithm.lower())
|
||||
|
||||
def supports(self, type_enum, algorithm=None, mode=None):
|
||||
if algorithm is None:
|
||||
raise DogtagPluginAlgorithmException
|
||||
|
||||
response = self.keyclient.generate_symmetric_key(
|
||||
client_key_id,
|
||||
algorithm,
|
||||
generate_dto.bit_length,
|
||||
usages)
|
||||
return plugin.ResponseDTO(response.get_key_id(), None)
|
||||
|
||||
def generate_asymmetric(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
"""
|
||||
Generate a asymmetric key
|
||||
"""
|
||||
raise NotImplementedError("Feature not implemented for dogtag crypto")
|
||||
|
||||
def supports(self, type_enum, algorithm=None, bit_length=None,
|
||||
mode=None):
|
||||
"""
|
||||
Specifies what operations the plugin supports
|
||||
"""
|
||||
if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
|
||||
return True
|
||||
return self._is_algorithm_supported(algorithm,
|
||||
bit_length)
|
||||
elif type_enum == plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _map_algorithm(algorithm):
|
||||
"""
|
||||
Map Barbican algorithms to Dogtag plugin algorithms
|
||||
"""
|
||||
if algorithm == "aes":
|
||||
return key.KeyClient.AES_ALGORITHM
|
||||
elif algorithm == "des":
|
||||
return key.KeyClient.DES_ALGORITHM
|
||||
elif algorithm == "3des":
|
||||
return key.KeyClient.DES3_ALGORITHM
|
||||
else:
|
||||
return None
|
||||
|
||||
def _is_algorithm_supported(self, algorithm, bit_length=None):
|
||||
"""
|
||||
check if algorithm and bit length are supported
|
||||
|
||||
For now, we will just check the algorithm. When dogtag adds a
|
||||
call to check the bit length per algorithm, we can modify to
|
||||
make that call
|
||||
"""
|
||||
return self._map_algorithm(algorithm) is not None
|
||||
|
@ -295,8 +295,8 @@ class CryptoExtensionManager(named.NamedExtensionManager):
|
||||
else:
|
||||
raise CryptoPluginNotFound()
|
||||
|
||||
def generate_data_encryption_key(self, secret, content_type, tenant,
|
||||
kek_repo):
|
||||
def generate_symmetric_encryption_key(self, secret, content_type, tenant,
|
||||
kek_repo):
|
||||
"""Delegates generating a key to the first supported plugin.
|
||||
|
||||
Note that this key can be used by clients for their encryption
|
||||
@ -304,16 +304,8 @@ class CryptoExtensionManager(named.NamedExtensionManager):
|
||||
the plug-in key encryption process, and that encrypted datum
|
||||
is then returned from this method.
|
||||
"""
|
||||
if len(self.extensions) < 1:
|
||||
raise CryptoPluginNotFound()
|
||||
|
||||
generation_type = self._determine_type(secret.algorithm)
|
||||
for ext in self.extensions:
|
||||
if ext.obj.supports(generation_type, secret.algorithm):
|
||||
encrypting_plugin = ext.obj
|
||||
break
|
||||
else:
|
||||
raise CryptoSupportedPluginNotFound()
|
||||
encrypting_plugin = \
|
||||
self._determine_crypto_plugin(secret.algorithm)
|
||||
|
||||
kek_datum, kek_meta_dto = self._find_or_create_kek_objects(
|
||||
encrypting_plugin, tenant, kek_repo)
|
||||
@ -322,30 +314,108 @@ class CryptoExtensionManager(named.NamedExtensionManager):
|
||||
datum = models.EncryptedDatum(secret, kek_datum)
|
||||
datum.content_type = content_type
|
||||
|
||||
generate_dto = plugin_mod.GenerateDTO(generation_type,
|
||||
secret.algorithm,
|
||||
generate_dto = plugin_mod.GenerateDTO(secret.algorithm,
|
||||
secret.bit_length,
|
||||
secret.mode)
|
||||
# Create the encrypted secret.
|
||||
datum.cypher_text, datum.kek_meta_extended =\
|
||||
encrypting_plugin.generate(
|
||||
generate_dto, kek_meta_dto, tenant.keystone_id)
|
||||
secret.mode, None)
|
||||
# Create the encrypted meta.
|
||||
response_dto = encrypting_plugin.generate_symmetric(generate_dto,
|
||||
kek_meta_dto,
|
||||
tenant.keystone_id)
|
||||
|
||||
# Convert binary data into a text-based format.
|
||||
# TODO(jwood) Figure out by storing binary (BYTEA) data in Postgres
|
||||
# isn't working.
|
||||
datum.cypher_text = base64.b64encode(datum.cypher_text)
|
||||
|
||||
datum.cypher_text = base64.b64encode(response_dto.cypher_text)
|
||||
datum.kek_meta_extended = response_dto.kek_meta_extended
|
||||
return datum
|
||||
|
||||
def generate_asymmetric_encryption_keys(self, meta, content_type, tenant,
|
||||
kek_repo):
|
||||
"""Delegates generating a asymmteric keys to the first
|
||||
supported plugin based on `meta`. meta will provide extra
|
||||
information to help key generation.
|
||||
Based on passpharse in meta this method will return a tuple
|
||||
with two/three objects.
|
||||
|
||||
Note that this key can be used by clients for their encryption
|
||||
processes. This generated key is then be encrypted via
|
||||
the plug-in key encryption process, and that encrypted datum
|
||||
is then returned from this method.
|
||||
"""
|
||||
encrypting_plugin = \
|
||||
self._determine_crypto_plugin(meta.algorithm,
|
||||
meta.bit_length,
|
||||
meta.passphrase)
|
||||
|
||||
kek_datum, kek_meta_dto = self._find_or_create_kek_objects(
|
||||
encrypting_plugin, tenant, kek_repo)
|
||||
|
||||
generate_dto = plugin_mod.GenerateDTO(meta.algorithm,
|
||||
meta.bit_length,
|
||||
None, meta.passphrase)
|
||||
# generate the secret.
|
||||
private_key_dto, public_key_dto, passwd_dto = \
|
||||
encrypting_plugin.generate_asymmetric(
|
||||
generate_dto,
|
||||
kek_meta_dto,
|
||||
tenant.keystone_id)
|
||||
|
||||
# Create an encrypted datum instances for each secret type
|
||||
# and add the created cypher text.
|
||||
priv_datum = models.EncryptedDatum(None, kek_datum)
|
||||
priv_datum.content_type = content_type
|
||||
priv_datum.cypher_text = base64.b64encode(private_key_dto.cypher_text)
|
||||
priv_datum.kek_meta_extended = private_key_dto.kek_meta_extended
|
||||
|
||||
public_datum = models.EncryptedDatum(None, kek_datum)
|
||||
public_datum.content_type = content_type
|
||||
public_datum.cypher_text = base64.b64encode(public_key_dto.cypher_text)
|
||||
public_datum.kek_meta_extended = public_key_dto.kek_meta_extended
|
||||
|
||||
passwd_datum = None
|
||||
if passwd_dto:
|
||||
passwd_datum = models.EncryptedDatum(None, kek_datum)
|
||||
passwd_datum.content_type = content_type
|
||||
passwd_datum.cypher_text = base64.b64encode(passwd_dto.cypher_text)
|
||||
passwd_datum.kek_meta_extended = \
|
||||
passwd_dto.kek_meta_extended
|
||||
|
||||
return (priv_datum, public_datum, passwd_datum)
|
||||
|
||||
def _determine_type(self, algorithm):
|
||||
"""Determines the type (symmetric only for now) based on algorithm"""
|
||||
"""Determines the type (symmetric and asymmetric for now)
|
||||
based on algorithm"""
|
||||
symmetric_algs = plugin_mod.PluginSupportTypes.SYMMETRIC_ALGORITHMS
|
||||
asymmetric_algs = plugin_mod.PluginSupportTypes.ASYMMETRIC_ALGORITHMS
|
||||
if algorithm.lower() in symmetric_algs:
|
||||
return plugin_mod.PluginSupportTypes.SYMMETRIC_KEY_GENERATION
|
||||
elif algorithm.lower() in asymmetric_algs:
|
||||
return plugin_mod.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION
|
||||
else:
|
||||
raise CryptoAlgorithmNotSupportedException(algorithm)
|
||||
|
||||
#TODO(atiwari): Use meta object instead of individual attribute
|
||||
#This has to be done while integration rest resources
|
||||
def _determine_crypto_plugin(self, algorithm, bit_length=None,
|
||||
mode=None):
|
||||
"""Determines the generation type and encrypting plug-in
|
||||
which supports the generation of secret based on
|
||||
generation type"""
|
||||
if len(self.extensions) < 1:
|
||||
raise CryptoPluginNotFound()
|
||||
|
||||
generation_type = self._determine_type(algorithm)
|
||||
for ext in self.extensions:
|
||||
if ext.obj.supports(generation_type, algorithm,
|
||||
bit_length,
|
||||
mode):
|
||||
encrypting_plugin = ext.obj
|
||||
break
|
||||
else:
|
||||
raise CryptoSupportedPluginNotFound()
|
||||
|
||||
return encrypting_plugin
|
||||
|
||||
def _plugin_supports(self, plugin_inst, kek_metadata_tenant):
|
||||
"""Tests for plugin support.
|
||||
|
||||
|
@ -135,7 +135,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
'iv': base64.b64encode(iv)
|
||||
})
|
||||
|
||||
return cyphertext, kek_meta_extended
|
||||
return plugin.ResponseDTO(cyphertext, kek_meta_extended)
|
||||
|
||||
def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
|
||||
keystone_id):
|
||||
@ -162,17 +162,22 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
|
||||
return kek_meta_dto
|
||||
|
||||
def generate(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
def generate_symmetric(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
byte_length = generate_dto.bit_length / 8
|
||||
rand = self.session.generateRandom(byte_length)
|
||||
if len(rand) != byte_length:
|
||||
raise P11CryptoPluginException()
|
||||
return self.encrypt(plugin.EncryptDTO(rand), kek_meta_dto, keystone_id)
|
||||
|
||||
def supports(self, type_enum, algorithm=None, mode=None):
|
||||
def generate_asymmetric(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
raise NotImplementedError("Feature not implemented for PKCS11")
|
||||
|
||||
def supports(self, type_enum, algorithm=None, bit_length=None, mode=None):
|
||||
if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
@ -16,12 +16,17 @@
|
||||
import abc
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.PublicKey import DSA
|
||||
from Crypto.Util import asn1
|
||||
from Crypto import Random
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
from barbican.common import utils
|
||||
|
||||
from barbican.openstack.common.gettextutils import _
|
||||
|
||||
LOG = utils.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
@ -41,7 +46,13 @@ class PluginSupportTypes(object):
|
||||
ENCRYPT_DECRYPT = "ENCRYPT_DECRYPT"
|
||||
SYMMETRIC_KEY_GENERATION = "SYMMETRIC_KEY_GENERATION"
|
||||
# A list of symmetric algorithms that are used to determine type of key gen
|
||||
SYMMETRIC_ALGORITHMS = ['aes', 'des']
|
||||
SYMMETRIC_ALGORITHMS = ['aes', 'des', '3des', 'hmacsha1',
|
||||
'hmacsha256', 'hmacsha384', 'hmacsha512']
|
||||
SYMMETRIC_KEY_LENGTHS = [64, 128, 192, 256]
|
||||
|
||||
ASYMMETRIC_KEY_GENERATION = "ASYMMETRIC_KEY_GENERATION"
|
||||
ASYMMETRIC_ALGORITHMS = ['rsa', 'dsa']
|
||||
ASYMMETRIC_KEY_LENGTHS = [1024, 2048, 4096]
|
||||
|
||||
|
||||
class KEKMetaDTO(object):
|
||||
@ -73,11 +84,22 @@ class GenerateDTO(object):
|
||||
is passed in instances of this object.
|
||||
"""
|
||||
|
||||
def __init__(self, generation_type, algorithm, bit_length, mode):
|
||||
self.generation_type = generation_type
|
||||
def __init__(self, algorithm, bit_length, mode, passphrase=None):
|
||||
self.algorithm = algorithm
|
||||
self.bit_length = bit_length
|
||||
self.mode = mode
|
||||
self.passphrase = passphrase
|
||||
|
||||
|
||||
class ResponseDTO(object):
|
||||
"""
|
||||
Data transfer object for secret generation response.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, cypher_text, kek_meta_extended=None):
|
||||
self.cypher_text = cypher_text
|
||||
self.kek_meta_extended = kek_meta_extended
|
||||
|
||||
|
||||
class DecryptDTO(object):
|
||||
@ -126,6 +148,8 @@ def indicate_bind_completed(kek_meta_dto, kek_datum):
|
||||
class CryptoPluginBase(object):
|
||||
"""Base class for Crypto plugins."""
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def encrypt(self, encrypt_dto, kek_meta_dto, keystone_id):
|
||||
"""Encrypt unencrypted data in the context of the provided tenant.
|
||||
@ -134,10 +158,10 @@ class CryptoPluginBase(object):
|
||||
to be encrypted.
|
||||
:param kek_meta_dto: Key encryption key metadata to use for encryption.
|
||||
:param keystone_id: keystone_id associated with the unencrypted data.
|
||||
:returns: encrypted data and kek_meta_extended, the former the
|
||||
resultant cypher text, the latter being optional per-secret metadata
|
||||
needed to decrypt (over and above the per-tenant metadata managed
|
||||
outside of the plugins)
|
||||
:returns: An object of type ResponseDTO containing encrypted data and
|
||||
kek_meta_extended, the former the resultant cypher text, the latter
|
||||
being optional per-secret metadata needed to decrypt (over and above
|
||||
the per-tenant metadata managed outside of the plugins)
|
||||
|
||||
"""
|
||||
raise NotImplementedError # pragma: no cover
|
||||
@ -181,7 +205,7 @@ class CryptoPluginBase(object):
|
||||
raise NotImplementedError # pragma: no cover
|
||||
|
||||
@abc.abstractmethod
|
||||
def generate(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
def generate_symmetric(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
"""
|
||||
Generate a new key.
|
||||
|
||||
@ -191,15 +215,36 @@ class CryptoPluginBase(object):
|
||||
bit_length, algorithm and mode
|
||||
:param kek_meta_dto: Key encryption key metadata to use for decryption
|
||||
:param keystone_id: keystone_id associated with the data.
|
||||
:returns: encrypted data and kek_meta_extended, the former the
|
||||
resultant cypher text, the latter being optional per-secret metadata
|
||||
needed to decrypt (over and above the per-tenant metadata managed
|
||||
outside of the plugins)
|
||||
:returns: An object of type ResponseDTO containing encrypted data and
|
||||
kek_meta_extended, the former the resultant cypher text, the latter
|
||||
being optional per-secret metadata needed to decrypt (over and above
|
||||
the per-tenant metadata managed outside of the plugins)
|
||||
"""
|
||||
raise NotImplementedError # pragma: no cover
|
||||
|
||||
@abc.abstractmethod
|
||||
def supports(self, type_enum, algorithm=None, mode=None):
|
||||
def generate_asymmetric(self, generate_dto,
|
||||
kek_meta_dto, keystone_id):
|
||||
"""Create a new asymmetric key.
|
||||
|
||||
:param generate_dto: data transfer object for the record
|
||||
associated with this generation request. Some relevant
|
||||
parameters can be extracted from this object, including
|
||||
bit_length, algorithm and passphrase
|
||||
:param kek_meta_dto: Key encryption key metadata to use for decryption
|
||||
:param keystone_id: keystone_id associated with the data.
|
||||
:returns: A tuple containing objects for private_key, public_key and
|
||||
optionally one for passphrase. The objects will be of type ResponseDTO.
|
||||
Each object containing encrypted data and kek_meta_extended, the former
|
||||
the resultant cypher text, the latter being optional per-secret
|
||||
metadata needed to decrypt (over and above the per-tenant metadata
|
||||
managed outside of the plugins)
|
||||
"""
|
||||
raise NotImplementedError # pragma: no cover
|
||||
|
||||
@abc.abstractmethod
|
||||
def supports(self, type_enum, algorithm=None, bit_length=None,
|
||||
mode=None):
|
||||
"""Used to determine if the plugin supports the requested operation.
|
||||
|
||||
:param type_enum: Enumeration from PluginSupportsType class
|
||||
@ -238,7 +283,7 @@ class SimpleCryptoPlugin(CryptoPluginBase):
|
||||
|
||||
cyphertext = iv + encryptor.encrypt(padded_data)
|
||||
|
||||
return cyphertext, None
|
||||
return ResponseDTO(cyphertext, None)
|
||||
|
||||
def decrypt(self, encrypted_dto, kek_meta_dto, kek_meta_extended,
|
||||
keystone_id):
|
||||
@ -256,15 +301,116 @@ class SimpleCryptoPlugin(CryptoPluginBase):
|
||||
kek_meta_dto.plugin_meta = None
|
||||
return kek_meta_dto
|
||||
|
||||
def generate(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
def generate_symmetric(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
byte_length = int(generate_dto.bit_length) / 8
|
||||
unencrypted = Random.get_random_bytes(byte_length)
|
||||
return self.encrypt(EncryptDTO(unencrypted), kek_meta_dto, keystone_id)
|
||||
|
||||
def supports(self, type_enum, algorithm=None, mode=None):
|
||||
return self.encrypt(EncryptDTO(unencrypted),
|
||||
kek_meta_dto,
|
||||
keystone_id)
|
||||
|
||||
def generate_asymmetric(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
"""Generate asymmetric keys based on below rule
|
||||
- RSA, with passphrase (supported)
|
||||
- RSA, without passphrase (supported)
|
||||
- DSA, without passphrase (supported)
|
||||
- DSA, with passphrase (not supported)
|
||||
|
||||
Note: PyCrypto is not capable of serializing DSA
|
||||
keys and DER formated keys. Such keys will be
|
||||
serialized to Base64 PEM to store in DB.
|
||||
|
||||
TODO (atiwari/reaperhulk): PyCrypto is not capable to serialize
|
||||
DSA keys and DER formated keys, later we need to pick better
|
||||
crypto lib.
|
||||
"""
|
||||
if generate_dto.algorithm is None\
|
||||
or generate_dto.algorithm.lower() == 'rsa':
|
||||
private_key = RSA.generate(
|
||||
generate_dto.bit_length, None, None, 65537)
|
||||
elif generate_dto.algorithm.lower() == 'dsa':
|
||||
private_key = DSA.generate(generate_dto.bit_length, None, None)
|
||||
|
||||
public_key = private_key.publickey()
|
||||
|
||||
#Note (atiwari): key wrapping format PEM only supported
|
||||
if generate_dto.algorithm.lower() == 'rsa':
|
||||
public_key, private_key = self._wrap_key(public_key, private_key,
|
||||
generate_dto.passphrase)
|
||||
if generate_dto.algorithm.lower() == 'dsa':
|
||||
if generate_dto.passphrase:
|
||||
raise ValueError('Passphrase not supported for DSA key')
|
||||
public_key, private_key = self._serialize_dsa_key(public_key,
|
||||
private_key)
|
||||
private_dto = self.encrypt(EncryptDTO(private_key),
|
||||
kek_meta_dto,
|
||||
keystone_id)
|
||||
|
||||
public_dto = self.encrypt(EncryptDTO(public_key),
|
||||
kek_meta_dto,
|
||||
keystone_id)
|
||||
|
||||
passphrase_dto = None
|
||||
if generate_dto.passphrase:
|
||||
passphrase_dto = self.encrypt(EncryptDTO(generate_dto.passphrase),
|
||||
kek_meta_dto,
|
||||
keystone_id)
|
||||
|
||||
return private_dto, public_dto, passphrase_dto
|
||||
|
||||
def supports(self, type_enum, algorithm=None, bit_length=None,
|
||||
mode=None):
|
||||
if type_enum == PluginSupportTypes.ENCRYPT_DECRYPT:
|
||||
return True
|
||||
elif type_enum == PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
|
||||
|
||||
if type_enum == PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
|
||||
return self._is_algorithm_supported(algorithm,
|
||||
bit_length)
|
||||
elif type_enum == PluginSupportTypes.ASYMMETRIC_KEY_GENERATION:
|
||||
return self._is_algorithm_supported(algorithm,
|
||||
bit_length)
|
||||
else:
|
||||
return False
|
||||
|
||||
def _wrap_key(self, public_key, private_key,
|
||||
passphrase):
|
||||
pkcs = 8
|
||||
key_wrap_format = 'PEM'
|
||||
|
||||
private_key = private_key.exportKey(key_wrap_format, passphrase, pkcs)
|
||||
public_key = public_key.exportKey()
|
||||
|
||||
return (public_key, private_key)
|
||||
|
||||
def _serialize_dsa_key(self, public_key, private_key):
|
||||
|
||||
pub_seq = asn1.DerSequence()
|
||||
pub_seq[:] = [0, public_key.p, public_key.q,
|
||||
public_key.g, public_key.y]
|
||||
public_key = "-----BEGIN DSA PUBLIC KEY-----\n%s"\
|
||||
"-----END DSA PUBLIC KEY-----" % pub_seq.encode().encode("base64")
|
||||
|
||||
prv_seq = asn1.DerSequence()
|
||||
prv_seq[:] = [0, private_key.p, private_key.q,
|
||||
private_key.g, private_key.y, private_key.x]
|
||||
private_key = "-----BEGIN DSA PRIVATE KEY-----\n%s"\
|
||||
"-----END DSA PRIVATE KEY-----" % prv_seq.encode().encode("base64")
|
||||
|
||||
return (public_key, private_key)
|
||||
|
||||
def _is_algorithm_supported(self, algorithm=None, bit_length=None):
|
||||
"""
|
||||
check if algorithm and bit_length combination is supported
|
||||
|
||||
"""
|
||||
if algorithm is None or bit_length is None:
|
||||
return False
|
||||
|
||||
if algorithm.lower() in PluginSupportTypes.SYMMETRIC_ALGORITHMS \
|
||||
and bit_length in PluginSupportTypes.SYMMETRIC_KEY_LENGTHS:
|
||||
return True
|
||||
elif algorithm.lower() in PluginSupportTypes.ASYMMETRIC_ALGORITHMS \
|
||||
and bit_length in PluginSupportTypes.ASYMMETRIC_KEY_LENGTHS:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
@ -21,6 +21,7 @@ import testtools
|
||||
try:
|
||||
from barbican.crypto import plugin as plugin_import
|
||||
from barbican.crypto.dogtag_crypto import DogtagCryptoPlugin
|
||||
from barbican.crypto.dogtag_crypto import DogtagPluginAlgorithmException
|
||||
from barbican.model import models
|
||||
imports_ok = True
|
||||
except:
|
||||
@ -66,7 +67,7 @@ class WhenTestingDogtagCryptoPlugin(testtools.TestCase):
|
||||
secret.algorithm,
|
||||
secret.bit_length,
|
||||
None)
|
||||
_encrypted, _kek_ext = self.plugin.generate(
|
||||
self.plugin.generate_symmetric(
|
||||
generate_dto,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock()
|
||||
@ -78,6 +79,25 @@ class WhenTestingDogtagCryptoPlugin(testtools.TestCase):
|
||||
secret.bit_length,
|
||||
mock.ANY)
|
||||
|
||||
def test_generate_non_supported_algorithm(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
secret = models.Secret()
|
||||
secret.bit_length = 128
|
||||
secret.algorithm = "hmacsha256"
|
||||
generate_dto = plugin_import.GenerateDTO(
|
||||
plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
|
||||
secret.algorithm,
|
||||
secret.bit_length,
|
||||
None)
|
||||
self.assertRaises(
|
||||
DogtagPluginAlgorithmException,
|
||||
self.plugin.generate_symmetric,
|
||||
generate_dto,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock()
|
||||
)
|
||||
|
||||
def test_raises_error_with_no_pem_path(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
@ -156,6 +176,25 @@ class WhenTestingDogtagCryptoPlugin(testtools.TestCase):
|
||||
)
|
||||
)
|
||||
|
||||
def test_supports_symmetric_hmacsha256_key_generation(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
self.assertFalse(
|
||||
self.plugin.supports(
|
||||
plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
|
||||
'hmacsha256', 128
|
||||
)
|
||||
)
|
||||
|
||||
def test_supports_asymmetric_key_generation(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
self.assertFalse(
|
||||
self.plugin.supports(
|
||||
plugin_import.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION
|
||||
)
|
||||
)
|
||||
|
||||
def test_does_not_support_unknown_type(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
|
@ -19,7 +19,8 @@ import testtools
|
||||
|
||||
from barbican.crypto import extension_manager as em
|
||||
from barbican.crypto import mime_types as mt
|
||||
from barbican.crypto.plugin import CryptoPluginBase, PluginSupportTypes
|
||||
from barbican.crypto.plugin import CryptoPluginBase, PluginSupportTypes, \
|
||||
SimpleCryptoPlugin
|
||||
|
||||
|
||||
class TestSupportsCryptoPlugin(CryptoPluginBase):
|
||||
@ -34,10 +35,15 @@ class TestSupportsCryptoPlugin(CryptoPluginBase):
|
||||
def bind_kek_metadata(self, kek_meta_dto):
|
||||
return None
|
||||
|
||||
def generate(self, generate_dto, type_enum, kek_meta_dto, keystone_id):
|
||||
def generate_symmetric(self, generate_dto, type_enum,
|
||||
kek_meta_dto, keystone_id):
|
||||
raise NotImplementedError()
|
||||
|
||||
def supports(self, type_enum, algorithm=None, mode=None):
|
||||
def generate_asymmetric(self, generate_dto, type_enum,
|
||||
kek_meta_dto, keystone_id):
|
||||
raise NotImplementedError("Feature not implemented for PKCS11")
|
||||
|
||||
def supports(self, type_enum, algorithm=None, bit_length=None, mode=None):
|
||||
return False
|
||||
|
||||
|
||||
@ -234,6 +240,13 @@ class WhenTestingCryptoExtensionManager(testtools.TestCase):
|
||||
'faux_alg',
|
||||
)
|
||||
|
||||
def test_create_asymmetric_supported_algorithm(self):
|
||||
skg = PluginSupportTypes.ASYMMETRIC_KEY_GENERATION
|
||||
self.assertEqual(skg, self.manager._determine_type('RSA'))
|
||||
self.assertEqual(skg, self.manager._determine_type('rsa'))
|
||||
self.assertEqual(skg, self.manager._determine_type('DSA'))
|
||||
self.assertEqual(skg, self.manager._determine_type('dsa'))
|
||||
|
||||
def test_encrypt_no_plugin_found(self):
|
||||
self.manager.extensions = []
|
||||
self.assertRaises(
|
||||
@ -295,7 +308,7 @@ class WhenTestingCryptoExtensionManager(testtools.TestCase):
|
||||
self.manager.extensions = []
|
||||
self.assertRaises(
|
||||
em.CryptoPluginNotFound,
|
||||
self.manager.generate_data_encryption_key,
|
||||
self.manager.generate_symmetric_encryption_key,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock(),
|
||||
@ -308,7 +321,7 @@ class WhenTestingCryptoExtensionManager(testtools.TestCase):
|
||||
self.manager.extensions = [plugin_mock]
|
||||
self.assertRaises(
|
||||
em.CryptoSupportedPluginNotFound,
|
||||
self.manager.generate_data_encryption_key,
|
||||
self.manager.generate_symmetric_encryption_key,
|
||||
mock.MagicMock(algorithm='AES'),
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock(),
|
||||
@ -338,3 +351,85 @@ class WhenTestingCryptoExtensionManager(testtools.TestCase):
|
||||
kek_repo
|
||||
)
|
||||
kek_repo.save.assert_called_once()
|
||||
|
||||
def generate_asymmetric_encryption_keys_no_plugin_found(self):
|
||||
self.manager.extensions = []
|
||||
self.assertRaises(
|
||||
em.CryptoPluginNotFound,
|
||||
self.manager.generate_asymmetric_encryption_keys,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock(),
|
||||
)
|
||||
|
||||
def generate_asymmetric_encryption_keys_no_supported_plugin(self):
|
||||
plugin = TestSupportsCryptoPlugin()
|
||||
plugin_mock = mock.MagicMock(obj=plugin)
|
||||
self.manager.extensions = [plugin_mock]
|
||||
self.assertRaises(
|
||||
em.CryptoSupportedPluginNotFound,
|
||||
self.manager.generate_asymmetric_encryption_keys,
|
||||
mock.MagicMock(algorithm='DSA'),
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock()
|
||||
)
|
||||
|
||||
def generate_asymmetric_encryption_rsa_keys_ensure_encoding(self):
|
||||
plugin = SimpleCryptoPlugin()
|
||||
plugin_mock = mock.MagicMock(obj=plugin)
|
||||
self.manager.extensions = [plugin_mock]
|
||||
|
||||
meta = mock.MagicMock(algorithm='rsa',
|
||||
bit_length=1024,
|
||||
passphrase=None)
|
||||
|
||||
private_datum, public_datum, passphrase_datum = \
|
||||
self.manager.generate_asymmetric_encryption_keys(meta,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
self.assertIsNotNone(private_datum)
|
||||
self.assertIsNotNone(public_datum)
|
||||
self.assertIsNone(passphrase_datum)
|
||||
|
||||
try:
|
||||
base64.b64decode(private_datum.cypher_text)
|
||||
base64.b64decode(public_datum.cypher_text)
|
||||
if passphrase_datum:
|
||||
base64.b64decode(passphrase_datum.cypher_text)
|
||||
isB64Encoding = True
|
||||
except Exception:
|
||||
isB64Encoding = False
|
||||
|
||||
self.assertTrue(isB64Encoding)
|
||||
|
||||
def generate_asymmetric_encryption_dsa_keys_ensure_encoding(self):
|
||||
plugin = SimpleCryptoPlugin()
|
||||
plugin_mock = mock.MagicMock(obj=plugin)
|
||||
self.manager.extensions = [plugin_mock]
|
||||
|
||||
meta = mock.MagicMock(algorithm='rsa',
|
||||
bit_length=1024,
|
||||
passphrase=None)
|
||||
|
||||
private_datum, public_datum, passphrase_datum = \
|
||||
self.manager.generate_asymmetric_encryption_keys(meta,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
self.assertIsNotNone(private_datum)
|
||||
self.assertIsNotNone(public_datum)
|
||||
self.assertIsNone(passphrase_datum)
|
||||
|
||||
try:
|
||||
base64.b64decode(private_datum.cypher_text)
|
||||
base64.b64decode(public_datum.cypher_text)
|
||||
if passphrase_datum:
|
||||
base64.b64decode(passphrase_datum.cypher_text)
|
||||
isB64Encoding = True
|
||||
except Exception:
|
||||
isB64Encoding = False
|
||||
|
||||
self.assertTrue(isB64Encoding)
|
||||
|
@ -51,11 +51,10 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase):
|
||||
secret.bit_length = 128
|
||||
secret.algorithm = "AES"
|
||||
generate_dto = plugin_import.GenerateDTO(
|
||||
plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
|
||||
secret.algorithm,
|
||||
secret.bit_length,
|
||||
None)
|
||||
encrypted, kek_ext = self.plugin.generate(
|
||||
None, None)
|
||||
self.plugin.generate_symmetric(
|
||||
generate_dto,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock()
|
||||
@ -68,13 +67,12 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase):
|
||||
secret.bit_length = 192
|
||||
secret.algorithm = "AES"
|
||||
generate_dto = plugin_import.GenerateDTO(
|
||||
plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
|
||||
secret.algorithm,
|
||||
secret.bit_length,
|
||||
None)
|
||||
None, None)
|
||||
self.assertRaises(
|
||||
p11_crypto.P11CryptoPluginException,
|
||||
self.plugin.generate,
|
||||
self.plugin.generate_symmetric,
|
||||
generate_dto,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock()
|
||||
@ -168,16 +166,16 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase):
|
||||
self.p11_mock.Mechanism.return_value = mech
|
||||
self.session.encrypt.return_value = [1, 2, 3, 4, 5]
|
||||
encrypt_dto = plugin_import.EncryptDTO(payload)
|
||||
cyphertext, kek_meta_extended = self.plugin.encrypt(encrypt_dto,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
response_dto = self.plugin.encrypt(encrypt_dto,
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
|
||||
self.session.encrypt.assert_called_once_with(key,
|
||||
payload,
|
||||
mech)
|
||||
self.assertEqual(b'\x01\x02\x03\x04\x05', cyphertext)
|
||||
self.assertEqual(b'\x01\x02\x03\x04\x05', response_dto.cypher_text)
|
||||
self.assertEqual('{"iv": "AQIDBAUGBwgJCgsMDQ4PEA=="}',
|
||||
kek_meta_extended)
|
||||
response_dto.kek_meta_extended)
|
||||
|
||||
def test_decrypt(self):
|
||||
key = 'key1'
|
||||
|
@ -14,6 +14,10 @@
|
||||
# limitations under the License.
|
||||
|
||||
from Crypto import Random
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.PublicKey import DSA
|
||||
from Crypto.Util import asn1
|
||||
|
||||
from mock import MagicMock
|
||||
|
||||
import testtools
|
||||
@ -40,10 +44,15 @@ class TestCryptoPlugin(plugin.CryptoPluginBase):
|
||||
kek_meta_dto.plugin_meta = None
|
||||
return kek_meta_dto
|
||||
|
||||
def generate(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
return "encrypted insecure key", None
|
||||
def generate_symmetric(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
return plugin.ResponseDTO("encrypted insecure key", None)
|
||||
|
||||
def supports(self, type_enum, algorithm=None, mode=None):
|
||||
def generate_asymmetric(self, generate_dto, kek_meta_dto, keystone_id):
|
||||
return (plugin.ResponseDTO('insecure_private_key', None),
|
||||
plugin.ResponseDTO('insecure_public_key', None),
|
||||
None)
|
||||
|
||||
def supports(self, type_enum, algorithm=None, bit_length=None, mode=None):
|
||||
if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
|
||||
@ -100,22 +109,25 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase):
|
||||
def test_byte_string_encryption(self):
|
||||
unencrypted = b'some_secret'
|
||||
encrypt_dto = plugin.EncryptDTO(unencrypted)
|
||||
encrypted, kek_ext = self.plugin.encrypt(encrypt_dto,
|
||||
MagicMock(),
|
||||
MagicMock())
|
||||
decrypt_dto = plugin.DecryptDTO(encrypted)
|
||||
response_dto = self.plugin.encrypt(encrypt_dto,
|
||||
MagicMock(),
|
||||
MagicMock())
|
||||
decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text)
|
||||
decrypted = self.plugin.decrypt(decrypt_dto, MagicMock(),
|
||||
kek_ext, MagicMock())
|
||||
response_dto.kek_meta_extended,
|
||||
MagicMock())
|
||||
self.assertEqual(unencrypted, decrypted)
|
||||
|
||||
def test_random_bytes_encryption(self):
|
||||
unencrypted = Random.get_random_bytes(10)
|
||||
encrypt_dto = plugin.EncryptDTO(unencrypted)
|
||||
encrypted, kek_meta_ext = self.plugin.encrypt(encrypt_dto,
|
||||
MagicMock(), MagicMock())
|
||||
decrypt_dto = plugin.DecryptDTO(encrypted)
|
||||
response_dto = self.plugin.encrypt(encrypt_dto,
|
||||
MagicMock(),
|
||||
MagicMock())
|
||||
decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text)
|
||||
decrypted = self.plugin.decrypt(decrypt_dto, MagicMock(),
|
||||
kek_meta_ext, MagicMock())
|
||||
response_dto.kek_meta_extended,
|
||||
MagicMock())
|
||||
self.assertEqual(unencrypted, decrypted)
|
||||
|
||||
def test_generate_256_bit_key(self):
|
||||
@ -123,18 +135,17 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase):
|
||||
secret.bit_length = 256
|
||||
secret.algorithm = "AES"
|
||||
generate_dto = plugin.GenerateDTO(
|
||||
plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
|
||||
secret.algorithm,
|
||||
secret.bit_length,
|
||||
secret.mode)
|
||||
encrypted, kek_ext = self.plugin.generate(
|
||||
secret.mode, None)
|
||||
response_dto = self.plugin.generate_symmetric(
|
||||
generate_dto,
|
||||
MagicMock(),
|
||||
MagicMock()
|
||||
)
|
||||
decrypt_dto = plugin.DecryptDTO(encrypted)
|
||||
decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text)
|
||||
key = self.plugin.decrypt(decrypt_dto, MagicMock(),
|
||||
kek_ext, MagicMock())
|
||||
response_dto.kek_meta_extended, MagicMock())
|
||||
self.assertEqual(len(key), 32)
|
||||
|
||||
def test_generate_192_bit_key(self):
|
||||
@ -142,18 +153,17 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase):
|
||||
secret.bit_length = 192
|
||||
secret.algorithm = "AES"
|
||||
generate_dto = plugin.GenerateDTO(
|
||||
plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
|
||||
secret.algorithm,
|
||||
secret.bit_length,
|
||||
None)
|
||||
encrypted, kek_ext = self.plugin.generate(
|
||||
None, None)
|
||||
response_dto = self.plugin.generate_symmetric(
|
||||
generate_dto,
|
||||
MagicMock(),
|
||||
MagicMock()
|
||||
)
|
||||
decrypt_dto = plugin.DecryptDTO(encrypted)
|
||||
decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text)
|
||||
key = self.plugin.decrypt(decrypt_dto, MagicMock(),
|
||||
kek_ext, MagicMock())
|
||||
response_dto.kek_meta_extended, MagicMock())
|
||||
self.assertEqual(len(key), 24)
|
||||
|
||||
def test_generate_128_bit_key(self):
|
||||
@ -161,18 +171,17 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase):
|
||||
secret.bit_length = 128
|
||||
secret.algorithm = "AES"
|
||||
generate_dto = plugin.GenerateDTO(
|
||||
plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
|
||||
secret.algorithm,
|
||||
secret.bit_length,
|
||||
None)
|
||||
encrypted, kek_ext = self.plugin.generate(
|
||||
None, None)
|
||||
response_dto = self.plugin.generate_symmetric(
|
||||
generate_dto,
|
||||
MagicMock(),
|
||||
MagicMock()
|
||||
)
|
||||
decrypt_dto = plugin.DecryptDTO(encrypted)
|
||||
decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text)
|
||||
key = self.plugin.decrypt(decrypt_dto, MagicMock(),
|
||||
kek_ext, MagicMock())
|
||||
response_dto.kek_meta_extended, MagicMock())
|
||||
self.assertEqual(len(key), 16)
|
||||
|
||||
def test_supports_encrypt_decrypt(self):
|
||||
@ -183,7 +192,26 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase):
|
||||
def test_supports_symmetric_key_generation(self):
|
||||
self.assertTrue(
|
||||
self.plugin.supports(
|
||||
plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION)
|
||||
plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, 'AES', 64)
|
||||
)
|
||||
self.assertFalse(
|
||||
self.plugin.supports(
|
||||
plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, 'AES')
|
||||
)
|
||||
self.assertTrue(
|
||||
self.plugin.supports(
|
||||
plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
|
||||
'hmacsha512', 128)
|
||||
)
|
||||
self.assertFalse(
|
||||
self.plugin.supports(
|
||||
plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
|
||||
'hmacsha512', 12)
|
||||
)
|
||||
self.assertFalse(
|
||||
self.plugin.supports(
|
||||
plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
|
||||
'Camillia', 128)
|
||||
)
|
||||
|
||||
def test_does_not_support_unknown_type(self):
|
||||
@ -199,3 +227,128 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase):
|
||||
self.assertEqual(kek_metadata_dto.bit_length, 128)
|
||||
self.assertEqual(kek_metadata_dto.mode, 'cbc')
|
||||
self.assertIsNone(kek_metadata_dto.plugin_meta)
|
||||
|
||||
def test_supports_asymmetric_key_generation(self):
|
||||
self.assertTrue(
|
||||
self.plugin.supports(
|
||||
plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION,
|
||||
'DSA', 1024)
|
||||
)
|
||||
self.assertTrue(
|
||||
self.plugin.supports(
|
||||
plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION,
|
||||
"RSA", 1024)
|
||||
)
|
||||
self.assertFalse(
|
||||
self.plugin.supports(
|
||||
plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION,
|
||||
"DSA", 512)
|
||||
)
|
||||
self.assertFalse(
|
||||
self.plugin.supports(
|
||||
plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION,
|
||||
"RSA", 64)
|
||||
)
|
||||
|
||||
def test_generate_512_bit_RSA_key(self):
|
||||
generate_dto = plugin.GenerateDTO('rsa', 512, None, None)
|
||||
self.assertRaises(ValueError,
|
||||
self.plugin.generate_asymmetric,
|
||||
generate_dto,
|
||||
MagicMock(),
|
||||
MagicMock())
|
||||
|
||||
def test_generate_2048_bit_DSA_key(self):
|
||||
generate_dto = plugin.GenerateDTO('dsa', 2048, None, None)
|
||||
self.assertRaises(ValueError, self.plugin.generate_asymmetric,
|
||||
generate_dto,
|
||||
MagicMock(),
|
||||
MagicMock())
|
||||
|
||||
def test_generate_2048_bit_DSA_key_with_passphrase(self):
|
||||
generate_dto = plugin.GenerateDTO('dsa', 2048, None, 'Passphrase')
|
||||
self.assertRaises(ValueError, self.plugin.generate_asymmetric,
|
||||
generate_dto,
|
||||
MagicMock(),
|
||||
MagicMock())
|
||||
|
||||
def test_generate_asymmetric_1024_bit_key(self):
|
||||
generate_dto = plugin.GenerateDTO('rsa', 1024, None, None)
|
||||
|
||||
private_dto, public_dto, passwd_dto = self.plugin.generate_asymmetric(
|
||||
generate_dto, MagicMock(), MagicMock())
|
||||
|
||||
decrypt_dto = plugin.DecryptDTO(private_dto.cypher_text)
|
||||
private_dto = self.plugin.decrypt(decrypt_dto,
|
||||
MagicMock(),
|
||||
private_dto.kek_meta_extended,
|
||||
MagicMock())
|
||||
|
||||
decrypt_dto = plugin.DecryptDTO(public_dto.cypher_text)
|
||||
public_dto = self.plugin.decrypt(decrypt_dto,
|
||||
MagicMock(),
|
||||
public_dto.kek_meta_extended,
|
||||
MagicMock())
|
||||
|
||||
public_dto = RSA.importKey(public_dto)
|
||||
private_dto = RSA.importKey(private_dto)
|
||||
self.assertEqual(public_dto.size(), 1023)
|
||||
self.assertEqual(private_dto.size(), 1023)
|
||||
self.assertTrue(private_dto.has_private)
|
||||
|
||||
def test_generate_1024_bit_RSA_key_in_pem(self):
|
||||
generate_dto = plugin.GenerateDTO('rsa', 1024, None, 'changeme')
|
||||
|
||||
private_dto, public_dto, passwd_dto = \
|
||||
self.plugin.generate_asymmetric(generate_dto,
|
||||
MagicMock(),
|
||||
MagicMock())
|
||||
decrypt_dto = plugin.DecryptDTO(private_dto.cypher_text)
|
||||
private_dto = self.plugin.decrypt(decrypt_dto,
|
||||
MagicMock(),
|
||||
private_dto.kek_meta_extended,
|
||||
MagicMock())
|
||||
|
||||
private_dto = RSA.importKey(private_dto, 'changeme')
|
||||
self.assertTrue(private_dto.has_private())
|
||||
|
||||
def test_generate_1024_DSA_key_in_pem_and_reconstruct_key_der(self):
|
||||
generate_dto = plugin.GenerateDTO('dsa', 1024, None, None)
|
||||
|
||||
private_dto, public_dto, passwd_dto = \
|
||||
self.plugin.generate_asymmetric(generate_dto,
|
||||
MagicMock(),
|
||||
MagicMock())
|
||||
|
||||
decrypt_dto = plugin.DecryptDTO(private_dto.cypher_text)
|
||||
private_dto = self.plugin.decrypt(decrypt_dto,
|
||||
MagicMock(),
|
||||
private_dto.kek_meta_extended,
|
||||
MagicMock())
|
||||
|
||||
prv_seq = asn1.DerSequence()
|
||||
data = "\n".join(private_dto.strip().split("\n")
|
||||
[1:-1]).decode("base64")
|
||||
prv_seq.decode(data)
|
||||
p, q, g, y, x = prv_seq[1:]
|
||||
|
||||
private_dto = DSA.construct((y, g, p, q, x))
|
||||
self.assertTrue(private_dto.has_private())
|
||||
|
||||
def test_generate_128_bit_hmac_key(self):
|
||||
secret = models.Secret()
|
||||
secret.bit_length = 128
|
||||
secret.algorithm = "hmacsha256"
|
||||
generate_dto = plugin.GenerateDTO(
|
||||
secret.algorithm,
|
||||
secret.bit_length,
|
||||
None, None)
|
||||
response_dto = self.plugin.generate_symmetric(
|
||||
generate_dto,
|
||||
MagicMock(),
|
||||
MagicMock()
|
||||
)
|
||||
decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text)
|
||||
key = self.plugin.decrypt(decrypt_dto, MagicMock(),
|
||||
response_dto.kek_meta_extended, MagicMock())
|
||||
self.assertEqual(len(key), 16)
|
||||
|
Loading…
x
Reference in New Issue
Block a user