From d864b87386a02fc53ab80cbc16f8f3475f978550 Mon Sep 17 00:00:00 2001 From: Arvind Tiwari Date: Tue, 18 Mar 2014 14:33:12 -0600 Subject: [PATCH] Extend crypto plugin to support more key types This change is part of "api-orders-add-more-types" BP phase 1 plan. As per phase 1 we will extend the order resource to support asymmetric and more symmetric keys. This specific change is to address the crypto plug-in and extension manager. Partial-BP: api-orders-add-more-types Change-Id: Idf0073c0703dff652e55e811538f86a8f7c7259b --- barbican/common/resources.py | 12 +- barbican/crypto/dogtag_crypto.py | 62 +++++- barbican/crypto/extension_manager.py | 114 ++++++++-- barbican/crypto/p11_crypto.py | 11 +- barbican/crypto/plugin.py | 182 +++++++++++++-- barbican/tests/crypto/test_dogtag_crypto.py | 41 +++- .../tests/crypto/test_extension_manager.py | 105 ++++++++- barbican/tests/crypto/test_p11_crypto.py | 20 +- barbican/tests/crypto/test_plugin.py | 209 +++++++++++++++--- 9 files changed, 656 insertions(+), 100 deletions(-) diff --git a/barbican/common/resources.py b/barbican/common/resources.py index 6b2f03679..0bdc0009d 100644 --- a/barbican/common/resources.py +++ b/barbican/common/resources.py @@ -71,10 +71,14 @@ def create_secret(data, tenant, crypto_manager, elif ok_to_generate: LOG.debug('Generating new secret...') - new_datum = crypto_manager.generate_data_encryption_key(new_secret, - content_type, - tenant, - kek_repo) + # TODO (atiwari): With new typed Order API proposal + # we need to translate new_secret to meta + # currently it is working as meta will have same attributes + new_datum = crypto_manager. \ + generate_symmetric_encryption_key(new_secret, + content_type, + tenant, + kek_repo) time_keeper.mark('after secret generate') else: diff --git a/barbican/crypto/dogtag_crypto.py b/barbican/crypto/dogtag_crypto.py index 22bbeb5b2..dbccfe046 100644 --- a/barbican/crypto/dogtag_crypto.py +++ b/barbican/crypto/dogtag_crypto.py @@ -26,6 +26,7 @@ from pki.kraclient import KRAClient from barbican.crypto import plugin from barbican.openstack.common.gettextutils import _ +from barbican.common import exception CONF = cfg.CONF @@ -56,6 +57,10 @@ CONF.register_group(dogtag_crypto_plugin_group) CONF.register_opts(dogtag_crypto_plugin_opts, group=dogtag_crypto_plugin_group) +class DogtagPluginAlgorithmException(exception.BarbicanException): + message = _("Invalid algorithm passed in") + + class DogtagCryptoPlugin(plugin.CryptoPluginBase): """Dogtag implementation of the crypto plugin with DRM as the backend""" @@ -138,7 +143,7 @@ class DogtagCryptoPlugin(plugin.CryptoPluginBase): encrypt_dto.unencrypted, key_algorithm=None, key_size=None) - return response.get_key_id(), None + return plugin.ResponseDTO(response.get_key_id(), None) def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended, keystone_id): @@ -173,7 +178,7 @@ class DogtagCryptoPlugin(plugin.CryptoPluginBase): """ return kek_meta_dto - def generate(self, generate_dto, kek_meta_dto, keystone_id): + def generate_symmetric(self, generate_dto, kek_meta_dto, keystone_id): """ Generate a symmetric key @@ -189,18 +194,59 @@ class DogtagCryptoPlugin(plugin.CryptoPluginBase): key.SymKeyGenerationRequest.ENCRYPT_USAGE] client_key_id = uuid.uuid4().hex - response = self.keyclient.generate_symmetric_key( - client_key_id, generate_dto.algorithm.upper(), - generate_dto.bit_length, usages) - return response.get_key_id(), None + algorithm = self._map_algorithm(generate_dto.algorithm.lower()) - def supports(self, type_enum, algorithm=None, mode=None): + if algorithm is None: + raise DogtagPluginAlgorithmException + + response = self.keyclient.generate_symmetric_key( + client_key_id, + algorithm, + generate_dto.bit_length, + usages) + return plugin.ResponseDTO(response.get_key_id(), None) + + def generate_asymmetric(self, generate_dto, kek_meta_dto, keystone_id): + """ + Generate a asymmetric key + """ + raise NotImplementedError("Feature not implemented for dogtag crypto") + + def supports(self, type_enum, algorithm=None, bit_length=None, + mode=None): """ Specifies what operations the plugin supports """ if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT: return True elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION: - return True + return self._is_algorithm_supported(algorithm, + bit_length) + elif type_enum == plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION: + return False else: return False + + @staticmethod + def _map_algorithm(algorithm): + """ + Map Barbican algorithms to Dogtag plugin algorithms + """ + if algorithm == "aes": + return key.KeyClient.AES_ALGORITHM + elif algorithm == "des": + return key.KeyClient.DES_ALGORITHM + elif algorithm == "3des": + return key.KeyClient.DES3_ALGORITHM + else: + return None + + def _is_algorithm_supported(self, algorithm, bit_length=None): + """ + check if algorithm and bit length are supported + + For now, we will just check the algorithm. When dogtag adds a + call to check the bit length per algorithm, we can modify to + make that call + """ + return self._map_algorithm(algorithm) is not None diff --git a/barbican/crypto/extension_manager.py b/barbican/crypto/extension_manager.py index 3ade05955..508e9045d 100644 --- a/barbican/crypto/extension_manager.py +++ b/barbican/crypto/extension_manager.py @@ -295,8 +295,8 @@ class CryptoExtensionManager(named.NamedExtensionManager): else: raise CryptoPluginNotFound() - def generate_data_encryption_key(self, secret, content_type, tenant, - kek_repo): + def generate_symmetric_encryption_key(self, secret, content_type, tenant, + kek_repo): """Delegates generating a key to the first supported plugin. Note that this key can be used by clients for their encryption @@ -304,16 +304,8 @@ class CryptoExtensionManager(named.NamedExtensionManager): the plug-in key encryption process, and that encrypted datum is then returned from this method. """ - if len(self.extensions) < 1: - raise CryptoPluginNotFound() - - generation_type = self._determine_type(secret.algorithm) - for ext in self.extensions: - if ext.obj.supports(generation_type, secret.algorithm): - encrypting_plugin = ext.obj - break - else: - raise CryptoSupportedPluginNotFound() + encrypting_plugin = \ + self._determine_crypto_plugin(secret.algorithm) kek_datum, kek_meta_dto = self._find_or_create_kek_objects( encrypting_plugin, tenant, kek_repo) @@ -322,30 +314,108 @@ class CryptoExtensionManager(named.NamedExtensionManager): datum = models.EncryptedDatum(secret, kek_datum) datum.content_type = content_type - generate_dto = plugin_mod.GenerateDTO(generation_type, - secret.algorithm, + generate_dto = plugin_mod.GenerateDTO(secret.algorithm, secret.bit_length, - secret.mode) - # Create the encrypted secret. - datum.cypher_text, datum.kek_meta_extended =\ - encrypting_plugin.generate( - generate_dto, kek_meta_dto, tenant.keystone_id) + secret.mode, None) + # Create the encrypted meta. + response_dto = encrypting_plugin.generate_symmetric(generate_dto, + kek_meta_dto, + tenant.keystone_id) # Convert binary data into a text-based format. # TODO(jwood) Figure out by storing binary (BYTEA) data in Postgres # isn't working. - datum.cypher_text = base64.b64encode(datum.cypher_text) - + datum.cypher_text = base64.b64encode(response_dto.cypher_text) + datum.kek_meta_extended = response_dto.kek_meta_extended return datum + def generate_asymmetric_encryption_keys(self, meta, content_type, tenant, + kek_repo): + """Delegates generating a asymmteric keys to the first + supported plugin based on `meta`. meta will provide extra + information to help key generation. + Based on passpharse in meta this method will return a tuple + with two/three objects. + + Note that this key can be used by clients for their encryption + processes. This generated key is then be encrypted via + the plug-in key encryption process, and that encrypted datum + is then returned from this method. + """ + encrypting_plugin = \ + self._determine_crypto_plugin(meta.algorithm, + meta.bit_length, + meta.passphrase) + + kek_datum, kek_meta_dto = self._find_or_create_kek_objects( + encrypting_plugin, tenant, kek_repo) + + generate_dto = plugin_mod.GenerateDTO(meta.algorithm, + meta.bit_length, + None, meta.passphrase) + # generate the secret. + private_key_dto, public_key_dto, passwd_dto = \ + encrypting_plugin.generate_asymmetric( + generate_dto, + kek_meta_dto, + tenant.keystone_id) + + # Create an encrypted datum instances for each secret type + # and add the created cypher text. + priv_datum = models.EncryptedDatum(None, kek_datum) + priv_datum.content_type = content_type + priv_datum.cypher_text = base64.b64encode(private_key_dto.cypher_text) + priv_datum.kek_meta_extended = private_key_dto.kek_meta_extended + + public_datum = models.EncryptedDatum(None, kek_datum) + public_datum.content_type = content_type + public_datum.cypher_text = base64.b64encode(public_key_dto.cypher_text) + public_datum.kek_meta_extended = public_key_dto.kek_meta_extended + + passwd_datum = None + if passwd_dto: + passwd_datum = models.EncryptedDatum(None, kek_datum) + passwd_datum.content_type = content_type + passwd_datum.cypher_text = base64.b64encode(passwd_dto.cypher_text) + passwd_datum.kek_meta_extended = \ + passwd_dto.kek_meta_extended + + return (priv_datum, public_datum, passwd_datum) + def _determine_type(self, algorithm): - """Determines the type (symmetric only for now) based on algorithm""" + """Determines the type (symmetric and asymmetric for now) + based on algorithm""" symmetric_algs = plugin_mod.PluginSupportTypes.SYMMETRIC_ALGORITHMS + asymmetric_algs = plugin_mod.PluginSupportTypes.ASYMMETRIC_ALGORITHMS if algorithm.lower() in symmetric_algs: return plugin_mod.PluginSupportTypes.SYMMETRIC_KEY_GENERATION + elif algorithm.lower() in asymmetric_algs: + return plugin_mod.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION else: raise CryptoAlgorithmNotSupportedException(algorithm) + #TODO(atiwari): Use meta object instead of individual attribute + #This has to be done while integration rest resources + def _determine_crypto_plugin(self, algorithm, bit_length=None, + mode=None): + """Determines the generation type and encrypting plug-in + which supports the generation of secret based on + generation type""" + if len(self.extensions) < 1: + raise CryptoPluginNotFound() + + generation_type = self._determine_type(algorithm) + for ext in self.extensions: + if ext.obj.supports(generation_type, algorithm, + bit_length, + mode): + encrypting_plugin = ext.obj + break + else: + raise CryptoSupportedPluginNotFound() + + return encrypting_plugin + def _plugin_supports(self, plugin_inst, kek_metadata_tenant): """Tests for plugin support. diff --git a/barbican/crypto/p11_crypto.py b/barbican/crypto/p11_crypto.py index da7ecd7fb..9679bf5db 100644 --- a/barbican/crypto/p11_crypto.py +++ b/barbican/crypto/p11_crypto.py @@ -135,7 +135,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase): 'iv': base64.b64encode(iv) }) - return cyphertext, kek_meta_extended + return plugin.ResponseDTO(cyphertext, kek_meta_extended) def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended, keystone_id): @@ -162,17 +162,22 @@ class P11CryptoPlugin(plugin.CryptoPluginBase): return kek_meta_dto - def generate(self, generate_dto, kek_meta_dto, keystone_id): + def generate_symmetric(self, generate_dto, kek_meta_dto, keystone_id): byte_length = generate_dto.bit_length / 8 rand = self.session.generateRandom(byte_length) if len(rand) != byte_length: raise P11CryptoPluginException() return self.encrypt(plugin.EncryptDTO(rand), kek_meta_dto, keystone_id) - def supports(self, type_enum, algorithm=None, mode=None): + def generate_asymmetric(self, generate_dto, kek_meta_dto, keystone_id): + raise NotImplementedError("Feature not implemented for PKCS11") + + def supports(self, type_enum, algorithm=None, bit_length=None, mode=None): if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT: return True elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION: return True + elif type_enum == plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION: + return False else: return False diff --git a/barbican/crypto/plugin.py b/barbican/crypto/plugin.py index 80ef0e963..5a0a99f3d 100644 --- a/barbican/crypto/plugin.py +++ b/barbican/crypto/plugin.py @@ -16,12 +16,17 @@ import abc from Crypto.Cipher import AES +from Crypto.PublicKey import RSA +from Crypto.PublicKey import DSA +from Crypto.Util import asn1 from Crypto import Random from oslo.config import cfg import six +from barbican.common import utils from barbican.openstack.common.gettextutils import _ +LOG = utils.getLogger(__name__) CONF = cfg.CONF @@ -41,7 +46,13 @@ class PluginSupportTypes(object): ENCRYPT_DECRYPT = "ENCRYPT_DECRYPT" SYMMETRIC_KEY_GENERATION = "SYMMETRIC_KEY_GENERATION" # A list of symmetric algorithms that are used to determine type of key gen - SYMMETRIC_ALGORITHMS = ['aes', 'des'] + SYMMETRIC_ALGORITHMS = ['aes', 'des', '3des', 'hmacsha1', + 'hmacsha256', 'hmacsha384', 'hmacsha512'] + SYMMETRIC_KEY_LENGTHS = [64, 128, 192, 256] + + ASYMMETRIC_KEY_GENERATION = "ASYMMETRIC_KEY_GENERATION" + ASYMMETRIC_ALGORITHMS = ['rsa', 'dsa'] + ASYMMETRIC_KEY_LENGTHS = [1024, 2048, 4096] class KEKMetaDTO(object): @@ -73,11 +84,22 @@ class GenerateDTO(object): is passed in instances of this object. """ - def __init__(self, generation_type, algorithm, bit_length, mode): - self.generation_type = generation_type + def __init__(self, algorithm, bit_length, mode, passphrase=None): self.algorithm = algorithm self.bit_length = bit_length self.mode = mode + self.passphrase = passphrase + + +class ResponseDTO(object): + """ + Data transfer object for secret generation response. + + """ + + def __init__(self, cypher_text, kek_meta_extended=None): + self.cypher_text = cypher_text + self.kek_meta_extended = kek_meta_extended class DecryptDTO(object): @@ -126,6 +148,8 @@ def indicate_bind_completed(kek_meta_dto, kek_datum): class CryptoPluginBase(object): """Base class for Crypto plugins.""" + __metaclass__ = abc.ABCMeta + @abc.abstractmethod def encrypt(self, encrypt_dto, kek_meta_dto, keystone_id): """Encrypt unencrypted data in the context of the provided tenant. @@ -134,10 +158,10 @@ class CryptoPluginBase(object): to be encrypted. :param kek_meta_dto: Key encryption key metadata to use for encryption. :param keystone_id: keystone_id associated with the unencrypted data. - :returns: encrypted data and kek_meta_extended, the former the - resultant cypher text, the latter being optional per-secret metadata - needed to decrypt (over and above the per-tenant metadata managed - outside of the plugins) + :returns: An object of type ResponseDTO containing encrypted data and + kek_meta_extended, the former the resultant cypher text, the latter + being optional per-secret metadata needed to decrypt (over and above + the per-tenant metadata managed outside of the plugins) """ raise NotImplementedError # pragma: no cover @@ -181,7 +205,7 @@ class CryptoPluginBase(object): raise NotImplementedError # pragma: no cover @abc.abstractmethod - def generate(self, generate_dto, kek_meta_dto, keystone_id): + def generate_symmetric(self, generate_dto, kek_meta_dto, keystone_id): """ Generate a new key. @@ -191,15 +215,36 @@ class CryptoPluginBase(object): bit_length, algorithm and mode :param kek_meta_dto: Key encryption key metadata to use for decryption :param keystone_id: keystone_id associated with the data. - :returns: encrypted data and kek_meta_extended, the former the - resultant cypher text, the latter being optional per-secret metadata - needed to decrypt (over and above the per-tenant metadata managed - outside of the plugins) + :returns: An object of type ResponseDTO containing encrypted data and + kek_meta_extended, the former the resultant cypher text, the latter + being optional per-secret metadata needed to decrypt (over and above + the per-tenant metadata managed outside of the plugins) """ raise NotImplementedError # pragma: no cover @abc.abstractmethod - def supports(self, type_enum, algorithm=None, mode=None): + def generate_asymmetric(self, generate_dto, + kek_meta_dto, keystone_id): + """Create a new asymmetric key. + + :param generate_dto: data transfer object for the record + associated with this generation request. Some relevant + parameters can be extracted from this object, including + bit_length, algorithm and passphrase + :param kek_meta_dto: Key encryption key metadata to use for decryption + :param keystone_id: keystone_id associated with the data. + :returns: A tuple containing objects for private_key, public_key and + optionally one for passphrase. The objects will be of type ResponseDTO. + Each object containing encrypted data and kek_meta_extended, the former + the resultant cypher text, the latter being optional per-secret + metadata needed to decrypt (over and above the per-tenant metadata + managed outside of the plugins) + """ + raise NotImplementedError # pragma: no cover + + @abc.abstractmethod + def supports(self, type_enum, algorithm=None, bit_length=None, + mode=None): """Used to determine if the plugin supports the requested operation. :param type_enum: Enumeration from PluginSupportsType class @@ -238,7 +283,7 @@ class SimpleCryptoPlugin(CryptoPluginBase): cyphertext = iv + encryptor.encrypt(padded_data) - return cyphertext, None + return ResponseDTO(cyphertext, None) def decrypt(self, encrypted_dto, kek_meta_dto, kek_meta_extended, keystone_id): @@ -256,15 +301,116 @@ class SimpleCryptoPlugin(CryptoPluginBase): kek_meta_dto.plugin_meta = None return kek_meta_dto - def generate(self, generate_dto, kek_meta_dto, keystone_id): + def generate_symmetric(self, generate_dto, kek_meta_dto, keystone_id): byte_length = int(generate_dto.bit_length) / 8 unencrypted = Random.get_random_bytes(byte_length) - return self.encrypt(EncryptDTO(unencrypted), kek_meta_dto, keystone_id) - def supports(self, type_enum, algorithm=None, mode=None): + return self.encrypt(EncryptDTO(unencrypted), + kek_meta_dto, + keystone_id) + + def generate_asymmetric(self, generate_dto, kek_meta_dto, keystone_id): + """Generate asymmetric keys based on below rule + - RSA, with passphrase (supported) + - RSA, without passphrase (supported) + - DSA, without passphrase (supported) + - DSA, with passphrase (not supported) + + Note: PyCrypto is not capable of serializing DSA + keys and DER formated keys. Such keys will be + serialized to Base64 PEM to store in DB. + + TODO (atiwari/reaperhulk): PyCrypto is not capable to serialize + DSA keys and DER formated keys, later we need to pick better + crypto lib. + """ + if generate_dto.algorithm is None\ + or generate_dto.algorithm.lower() == 'rsa': + private_key = RSA.generate( + generate_dto.bit_length, None, None, 65537) + elif generate_dto.algorithm.lower() == 'dsa': + private_key = DSA.generate(generate_dto.bit_length, None, None) + + public_key = private_key.publickey() + + #Note (atiwari): key wrapping format PEM only supported + if generate_dto.algorithm.lower() == 'rsa': + public_key, private_key = self._wrap_key(public_key, private_key, + generate_dto.passphrase) + if generate_dto.algorithm.lower() == 'dsa': + if generate_dto.passphrase: + raise ValueError('Passphrase not supported for DSA key') + public_key, private_key = self._serialize_dsa_key(public_key, + private_key) + private_dto = self.encrypt(EncryptDTO(private_key), + kek_meta_dto, + keystone_id) + + public_dto = self.encrypt(EncryptDTO(public_key), + kek_meta_dto, + keystone_id) + + passphrase_dto = None + if generate_dto.passphrase: + passphrase_dto = self.encrypt(EncryptDTO(generate_dto.passphrase), + kek_meta_dto, + keystone_id) + + return private_dto, public_dto, passphrase_dto + + def supports(self, type_enum, algorithm=None, bit_length=None, + mode=None): if type_enum == PluginSupportTypes.ENCRYPT_DECRYPT: return True - elif type_enum == PluginSupportTypes.SYMMETRIC_KEY_GENERATION: + + if type_enum == PluginSupportTypes.SYMMETRIC_KEY_GENERATION: + return self._is_algorithm_supported(algorithm, + bit_length) + elif type_enum == PluginSupportTypes.ASYMMETRIC_KEY_GENERATION: + return self._is_algorithm_supported(algorithm, + bit_length) + else: + return False + + def _wrap_key(self, public_key, private_key, + passphrase): + pkcs = 8 + key_wrap_format = 'PEM' + + private_key = private_key.exportKey(key_wrap_format, passphrase, pkcs) + public_key = public_key.exportKey() + + return (public_key, private_key) + + def _serialize_dsa_key(self, public_key, private_key): + + pub_seq = asn1.DerSequence() + pub_seq[:] = [0, public_key.p, public_key.q, + public_key.g, public_key.y] + public_key = "-----BEGIN DSA PUBLIC KEY-----\n%s"\ + "-----END DSA PUBLIC KEY-----" % pub_seq.encode().encode("base64") + + prv_seq = asn1.DerSequence() + prv_seq[:] = [0, private_key.p, private_key.q, + private_key.g, private_key.y, private_key.x] + private_key = "-----BEGIN DSA PRIVATE KEY-----\n%s"\ + "-----END DSA PRIVATE KEY-----" % prv_seq.encode().encode("base64") + + return (public_key, private_key) + + def _is_algorithm_supported(self, algorithm=None, bit_length=None): + """ + check if algorithm and bit_length combination is supported + + """ + if algorithm is None or bit_length is None: + return False + + if algorithm.lower() in PluginSupportTypes.SYMMETRIC_ALGORITHMS \ + and bit_length in PluginSupportTypes.SYMMETRIC_KEY_LENGTHS: + return True + elif algorithm.lower() in PluginSupportTypes.ASYMMETRIC_ALGORITHMS \ + and bit_length in PluginSupportTypes.ASYMMETRIC_KEY_LENGTHS: return True else: return False diff --git a/barbican/tests/crypto/test_dogtag_crypto.py b/barbican/tests/crypto/test_dogtag_crypto.py index b71d650bd..571eca836 100644 --- a/barbican/tests/crypto/test_dogtag_crypto.py +++ b/barbican/tests/crypto/test_dogtag_crypto.py @@ -21,6 +21,7 @@ import testtools try: from barbican.crypto import plugin as plugin_import from barbican.crypto.dogtag_crypto import DogtagCryptoPlugin + from barbican.crypto.dogtag_crypto import DogtagPluginAlgorithmException from barbican.model import models imports_ok = True except: @@ -66,7 +67,7 @@ class WhenTestingDogtagCryptoPlugin(testtools.TestCase): secret.algorithm, secret.bit_length, None) - _encrypted, _kek_ext = self.plugin.generate( + self.plugin.generate_symmetric( generate_dto, mock.MagicMock(), mock.MagicMock() @@ -78,6 +79,25 @@ class WhenTestingDogtagCryptoPlugin(testtools.TestCase): secret.bit_length, mock.ANY) + def test_generate_non_supported_algorithm(self): + if not imports_ok: + self.skipTest("Dogtag imports not available") + secret = models.Secret() + secret.bit_length = 128 + secret.algorithm = "hmacsha256" + generate_dto = plugin_import.GenerateDTO( + plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, + secret.algorithm, + secret.bit_length, + None) + self.assertRaises( + DogtagPluginAlgorithmException, + self.plugin.generate_symmetric, + generate_dto, + mock.MagicMock(), + mock.MagicMock() + ) + def test_raises_error_with_no_pem_path(self): if not imports_ok: self.skipTest("Dogtag imports not available") @@ -156,6 +176,25 @@ class WhenTestingDogtagCryptoPlugin(testtools.TestCase): ) ) + def test_supports_symmetric_hmacsha256_key_generation(self): + if not imports_ok: + self.skipTest("Dogtag imports not available") + self.assertFalse( + self.plugin.supports( + plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, + 'hmacsha256', 128 + ) + ) + + def test_supports_asymmetric_key_generation(self): + if not imports_ok: + self.skipTest("Dogtag imports not available") + self.assertFalse( + self.plugin.supports( + plugin_import.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION + ) + ) + def test_does_not_support_unknown_type(self): if not imports_ok: self.skipTest("Dogtag imports not available") diff --git a/barbican/tests/crypto/test_extension_manager.py b/barbican/tests/crypto/test_extension_manager.py index fb54ca53b..ee3dd5580 100644 --- a/barbican/tests/crypto/test_extension_manager.py +++ b/barbican/tests/crypto/test_extension_manager.py @@ -19,7 +19,8 @@ import testtools from barbican.crypto import extension_manager as em from barbican.crypto import mime_types as mt -from barbican.crypto.plugin import CryptoPluginBase, PluginSupportTypes +from barbican.crypto.plugin import CryptoPluginBase, PluginSupportTypes, \ + SimpleCryptoPlugin class TestSupportsCryptoPlugin(CryptoPluginBase): @@ -34,10 +35,15 @@ class TestSupportsCryptoPlugin(CryptoPluginBase): def bind_kek_metadata(self, kek_meta_dto): return None - def generate(self, generate_dto, type_enum, kek_meta_dto, keystone_id): + def generate_symmetric(self, generate_dto, type_enum, + kek_meta_dto, keystone_id): raise NotImplementedError() - def supports(self, type_enum, algorithm=None, mode=None): + def generate_asymmetric(self, generate_dto, type_enum, + kek_meta_dto, keystone_id): + raise NotImplementedError("Feature not implemented for PKCS11") + + def supports(self, type_enum, algorithm=None, bit_length=None, mode=None): return False @@ -234,6 +240,13 @@ class WhenTestingCryptoExtensionManager(testtools.TestCase): 'faux_alg', ) + def test_create_asymmetric_supported_algorithm(self): + skg = PluginSupportTypes.ASYMMETRIC_KEY_GENERATION + self.assertEqual(skg, self.manager._determine_type('RSA')) + self.assertEqual(skg, self.manager._determine_type('rsa')) + self.assertEqual(skg, self.manager._determine_type('DSA')) + self.assertEqual(skg, self.manager._determine_type('dsa')) + def test_encrypt_no_plugin_found(self): self.manager.extensions = [] self.assertRaises( @@ -295,7 +308,7 @@ class WhenTestingCryptoExtensionManager(testtools.TestCase): self.manager.extensions = [] self.assertRaises( em.CryptoPluginNotFound, - self.manager.generate_data_encryption_key, + self.manager.generate_symmetric_encryption_key, mock.MagicMock(), mock.MagicMock(), mock.MagicMock(), @@ -308,7 +321,7 @@ class WhenTestingCryptoExtensionManager(testtools.TestCase): self.manager.extensions = [plugin_mock] self.assertRaises( em.CryptoSupportedPluginNotFound, - self.manager.generate_data_encryption_key, + self.manager.generate_symmetric_encryption_key, mock.MagicMock(algorithm='AES'), mock.MagicMock(), mock.MagicMock(), @@ -338,3 +351,85 @@ class WhenTestingCryptoExtensionManager(testtools.TestCase): kek_repo ) kek_repo.save.assert_called_once() + + def generate_asymmetric_encryption_keys_no_plugin_found(self): + self.manager.extensions = [] + self.assertRaises( + em.CryptoPluginNotFound, + self.manager.generate_asymmetric_encryption_keys, + mock.MagicMock(), + mock.MagicMock(), + mock.MagicMock(), + mock.MagicMock(), + ) + + def generate_asymmetric_encryption_keys_no_supported_plugin(self): + plugin = TestSupportsCryptoPlugin() + plugin_mock = mock.MagicMock(obj=plugin) + self.manager.extensions = [plugin_mock] + self.assertRaises( + em.CryptoSupportedPluginNotFound, + self.manager.generate_asymmetric_encryption_keys, + mock.MagicMock(algorithm='DSA'), + mock.MagicMock(), + mock.MagicMock(), + mock.MagicMock() + ) + + def generate_asymmetric_encryption_rsa_keys_ensure_encoding(self): + plugin = SimpleCryptoPlugin() + plugin_mock = mock.MagicMock(obj=plugin) + self.manager.extensions = [plugin_mock] + + meta = mock.MagicMock(algorithm='rsa', + bit_length=1024, + passphrase=None) + + private_datum, public_datum, passphrase_datum = \ + self.manager.generate_asymmetric_encryption_keys(meta, + mock.MagicMock(), + mock.MagicMock(), + mock.MagicMock()) + self.assertIsNotNone(private_datum) + self.assertIsNotNone(public_datum) + self.assertIsNone(passphrase_datum) + + try: + base64.b64decode(private_datum.cypher_text) + base64.b64decode(public_datum.cypher_text) + if passphrase_datum: + base64.b64decode(passphrase_datum.cypher_text) + isB64Encoding = True + except Exception: + isB64Encoding = False + + self.assertTrue(isB64Encoding) + + def generate_asymmetric_encryption_dsa_keys_ensure_encoding(self): + plugin = SimpleCryptoPlugin() + plugin_mock = mock.MagicMock(obj=plugin) + self.manager.extensions = [plugin_mock] + + meta = mock.MagicMock(algorithm='rsa', + bit_length=1024, + passphrase=None) + + private_datum, public_datum, passphrase_datum = \ + self.manager.generate_asymmetric_encryption_keys(meta, + mock.MagicMock(), + mock.MagicMock(), + mock.MagicMock()) + self.assertIsNotNone(private_datum) + self.assertIsNotNone(public_datum) + self.assertIsNone(passphrase_datum) + + try: + base64.b64decode(private_datum.cypher_text) + base64.b64decode(public_datum.cypher_text) + if passphrase_datum: + base64.b64decode(passphrase_datum.cypher_text) + isB64Encoding = True + except Exception: + isB64Encoding = False + + self.assertTrue(isB64Encoding) diff --git a/barbican/tests/crypto/test_p11_crypto.py b/barbican/tests/crypto/test_p11_crypto.py index 5b9ccdd68..36fa7f3d5 100644 --- a/barbican/tests/crypto/test_p11_crypto.py +++ b/barbican/tests/crypto/test_p11_crypto.py @@ -51,11 +51,10 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase): secret.bit_length = 128 secret.algorithm = "AES" generate_dto = plugin_import.GenerateDTO( - plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, secret.algorithm, secret.bit_length, - None) - encrypted, kek_ext = self.plugin.generate( + None, None) + self.plugin.generate_symmetric( generate_dto, mock.MagicMock(), mock.MagicMock() @@ -68,13 +67,12 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase): secret.bit_length = 192 secret.algorithm = "AES" generate_dto = plugin_import.GenerateDTO( - plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, secret.algorithm, secret.bit_length, - None) + None, None) self.assertRaises( p11_crypto.P11CryptoPluginException, - self.plugin.generate, + self.plugin.generate_symmetric, generate_dto, mock.MagicMock(), mock.MagicMock() @@ -168,16 +166,16 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase): self.p11_mock.Mechanism.return_value = mech self.session.encrypt.return_value = [1, 2, 3, 4, 5] encrypt_dto = plugin_import.EncryptDTO(payload) - cyphertext, kek_meta_extended = self.plugin.encrypt(encrypt_dto, - mock.MagicMock(), - mock.MagicMock()) + response_dto = self.plugin.encrypt(encrypt_dto, + mock.MagicMock(), + mock.MagicMock()) self.session.encrypt.assert_called_once_with(key, payload, mech) - self.assertEqual(b'\x01\x02\x03\x04\x05', cyphertext) + self.assertEqual(b'\x01\x02\x03\x04\x05', response_dto.cypher_text) self.assertEqual('{"iv": "AQIDBAUGBwgJCgsMDQ4PEA=="}', - kek_meta_extended) + response_dto.kek_meta_extended) def test_decrypt(self): key = 'key1' diff --git a/barbican/tests/crypto/test_plugin.py b/barbican/tests/crypto/test_plugin.py index 71dbb55bd..024f66dcb 100644 --- a/barbican/tests/crypto/test_plugin.py +++ b/barbican/tests/crypto/test_plugin.py @@ -14,6 +14,10 @@ # limitations under the License. from Crypto import Random +from Crypto.PublicKey import RSA +from Crypto.PublicKey import DSA +from Crypto.Util import asn1 + from mock import MagicMock import testtools @@ -40,10 +44,15 @@ class TestCryptoPlugin(plugin.CryptoPluginBase): kek_meta_dto.plugin_meta = None return kek_meta_dto - def generate(self, generate_dto, kek_meta_dto, keystone_id): - return "encrypted insecure key", None + def generate_symmetric(self, generate_dto, kek_meta_dto, keystone_id): + return plugin.ResponseDTO("encrypted insecure key", None) - def supports(self, type_enum, algorithm=None, mode=None): + def generate_asymmetric(self, generate_dto, kek_meta_dto, keystone_id): + return (plugin.ResponseDTO('insecure_private_key', None), + plugin.ResponseDTO('insecure_public_key', None), + None) + + def supports(self, type_enum, algorithm=None, bit_length=None, mode=None): if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT: return True elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION: @@ -100,22 +109,25 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase): def test_byte_string_encryption(self): unencrypted = b'some_secret' encrypt_dto = plugin.EncryptDTO(unencrypted) - encrypted, kek_ext = self.plugin.encrypt(encrypt_dto, - MagicMock(), - MagicMock()) - decrypt_dto = plugin.DecryptDTO(encrypted) + response_dto = self.plugin.encrypt(encrypt_dto, + MagicMock(), + MagicMock()) + decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text) decrypted = self.plugin.decrypt(decrypt_dto, MagicMock(), - kek_ext, MagicMock()) + response_dto.kek_meta_extended, + MagicMock()) self.assertEqual(unencrypted, decrypted) def test_random_bytes_encryption(self): unencrypted = Random.get_random_bytes(10) encrypt_dto = plugin.EncryptDTO(unencrypted) - encrypted, kek_meta_ext = self.plugin.encrypt(encrypt_dto, - MagicMock(), MagicMock()) - decrypt_dto = plugin.DecryptDTO(encrypted) + response_dto = self.plugin.encrypt(encrypt_dto, + MagicMock(), + MagicMock()) + decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text) decrypted = self.plugin.decrypt(decrypt_dto, MagicMock(), - kek_meta_ext, MagicMock()) + response_dto.kek_meta_extended, + MagicMock()) self.assertEqual(unencrypted, decrypted) def test_generate_256_bit_key(self): @@ -123,18 +135,17 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase): secret.bit_length = 256 secret.algorithm = "AES" generate_dto = plugin.GenerateDTO( - plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, secret.algorithm, secret.bit_length, - secret.mode) - encrypted, kek_ext = self.plugin.generate( + secret.mode, None) + response_dto = self.plugin.generate_symmetric( generate_dto, MagicMock(), MagicMock() ) - decrypt_dto = plugin.DecryptDTO(encrypted) + decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text) key = self.plugin.decrypt(decrypt_dto, MagicMock(), - kek_ext, MagicMock()) + response_dto.kek_meta_extended, MagicMock()) self.assertEqual(len(key), 32) def test_generate_192_bit_key(self): @@ -142,18 +153,17 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase): secret.bit_length = 192 secret.algorithm = "AES" generate_dto = plugin.GenerateDTO( - plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, secret.algorithm, secret.bit_length, - None) - encrypted, kek_ext = self.plugin.generate( + None, None) + response_dto = self.plugin.generate_symmetric( generate_dto, MagicMock(), MagicMock() ) - decrypt_dto = plugin.DecryptDTO(encrypted) + decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text) key = self.plugin.decrypt(decrypt_dto, MagicMock(), - kek_ext, MagicMock()) + response_dto.kek_meta_extended, MagicMock()) self.assertEqual(len(key), 24) def test_generate_128_bit_key(self): @@ -161,18 +171,17 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase): secret.bit_length = 128 secret.algorithm = "AES" generate_dto = plugin.GenerateDTO( - plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, secret.algorithm, secret.bit_length, - None) - encrypted, kek_ext = self.plugin.generate( + None, None) + response_dto = self.plugin.generate_symmetric( generate_dto, MagicMock(), MagicMock() ) - decrypt_dto = plugin.DecryptDTO(encrypted) + decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text) key = self.plugin.decrypt(decrypt_dto, MagicMock(), - kek_ext, MagicMock()) + response_dto.kek_meta_extended, MagicMock()) self.assertEqual(len(key), 16) def test_supports_encrypt_decrypt(self): @@ -183,7 +192,26 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase): def test_supports_symmetric_key_generation(self): self.assertTrue( self.plugin.supports( - plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION) + plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, 'AES', 64) + ) + self.assertFalse( + self.plugin.supports( + plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, 'AES') + ) + self.assertTrue( + self.plugin.supports( + plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, + 'hmacsha512', 128) + ) + self.assertFalse( + self.plugin.supports( + plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, + 'hmacsha512', 12) + ) + self.assertFalse( + self.plugin.supports( + plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, + 'Camillia', 128) ) def test_does_not_support_unknown_type(self): @@ -199,3 +227,128 @@ class WhenTestingSimpleCryptoPlugin(testtools.TestCase): self.assertEqual(kek_metadata_dto.bit_length, 128) self.assertEqual(kek_metadata_dto.mode, 'cbc') self.assertIsNone(kek_metadata_dto.plugin_meta) + + def test_supports_asymmetric_key_generation(self): + self.assertTrue( + self.plugin.supports( + plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION, + 'DSA', 1024) + ) + self.assertTrue( + self.plugin.supports( + plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION, + "RSA", 1024) + ) + self.assertFalse( + self.plugin.supports( + plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION, + "DSA", 512) + ) + self.assertFalse( + self.plugin.supports( + plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION, + "RSA", 64) + ) + + def test_generate_512_bit_RSA_key(self): + generate_dto = plugin.GenerateDTO('rsa', 512, None, None) + self.assertRaises(ValueError, + self.plugin.generate_asymmetric, + generate_dto, + MagicMock(), + MagicMock()) + + def test_generate_2048_bit_DSA_key(self): + generate_dto = plugin.GenerateDTO('dsa', 2048, None, None) + self.assertRaises(ValueError, self.plugin.generate_asymmetric, + generate_dto, + MagicMock(), + MagicMock()) + + def test_generate_2048_bit_DSA_key_with_passphrase(self): + generate_dto = plugin.GenerateDTO('dsa', 2048, None, 'Passphrase') + self.assertRaises(ValueError, self.plugin.generate_asymmetric, + generate_dto, + MagicMock(), + MagicMock()) + + def test_generate_asymmetric_1024_bit_key(self): + generate_dto = plugin.GenerateDTO('rsa', 1024, None, None) + + private_dto, public_dto, passwd_dto = self.plugin.generate_asymmetric( + generate_dto, MagicMock(), MagicMock()) + + decrypt_dto = plugin.DecryptDTO(private_dto.cypher_text) + private_dto = self.plugin.decrypt(decrypt_dto, + MagicMock(), + private_dto.kek_meta_extended, + MagicMock()) + + decrypt_dto = plugin.DecryptDTO(public_dto.cypher_text) + public_dto = self.plugin.decrypt(decrypt_dto, + MagicMock(), + public_dto.kek_meta_extended, + MagicMock()) + + public_dto = RSA.importKey(public_dto) + private_dto = RSA.importKey(private_dto) + self.assertEqual(public_dto.size(), 1023) + self.assertEqual(private_dto.size(), 1023) + self.assertTrue(private_dto.has_private) + + def test_generate_1024_bit_RSA_key_in_pem(self): + generate_dto = plugin.GenerateDTO('rsa', 1024, None, 'changeme') + + private_dto, public_dto, passwd_dto = \ + self.plugin.generate_asymmetric(generate_dto, + MagicMock(), + MagicMock()) + decrypt_dto = plugin.DecryptDTO(private_dto.cypher_text) + private_dto = self.plugin.decrypt(decrypt_dto, + MagicMock(), + private_dto.kek_meta_extended, + MagicMock()) + + private_dto = RSA.importKey(private_dto, 'changeme') + self.assertTrue(private_dto.has_private()) + + def test_generate_1024_DSA_key_in_pem_and_reconstruct_key_der(self): + generate_dto = plugin.GenerateDTO('dsa', 1024, None, None) + + private_dto, public_dto, passwd_dto = \ + self.plugin.generate_asymmetric(generate_dto, + MagicMock(), + MagicMock()) + + decrypt_dto = plugin.DecryptDTO(private_dto.cypher_text) + private_dto = self.plugin.decrypt(decrypt_dto, + MagicMock(), + private_dto.kek_meta_extended, + MagicMock()) + + prv_seq = asn1.DerSequence() + data = "\n".join(private_dto.strip().split("\n") + [1:-1]).decode("base64") + prv_seq.decode(data) + p, q, g, y, x = prv_seq[1:] + + private_dto = DSA.construct((y, g, p, q, x)) + self.assertTrue(private_dto.has_private()) + + def test_generate_128_bit_hmac_key(self): + secret = models.Secret() + secret.bit_length = 128 + secret.algorithm = "hmacsha256" + generate_dto = plugin.GenerateDTO( + secret.algorithm, + secret.bit_length, + None, None) + response_dto = self.plugin.generate_symmetric( + generate_dto, + MagicMock(), + MagicMock() + ) + decrypt_dto = plugin.DecryptDTO(response_dto.cypher_text) + key = self.plugin.decrypt(decrypt_dto, MagicMock(), + response_dto.kek_meta_extended, MagicMock()) + self.assertEqual(len(key), 16)