diff --git a/barbican/plugin/crypto/p11_crypto.py b/barbican/plugin/crypto/p11_crypto.py index ff434bb3d..3f3ab7688 100644 --- a/barbican/plugin/crypto/p11_crypto.py +++ b/barbican/plugin/crypto/p11_crypto.py @@ -72,6 +72,9 @@ p11_crypto_plugin_opts = [ cfg.StrOpt('plugin_name', help=u._('User friendly plugin name'), default='PKCS11 HSM'), + cfg.BoolOpt('generate_iv', + help=u._('Flag for plugin generated iv case'), + default=False), ] CONF.register_group(p11_crypto_plugin_group) CONF.register_opts(p11_crypto_plugin_opts, group=p11_crypto_plugin_group) @@ -286,6 +289,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase): ffi=ffi, algorithm=plugin_conf.algorithm, seed_random_buffer=seed_random_buffer, + generate_iv=plugin_conf.generate_iv, ) def _reinitialize_pkcs11(self): diff --git a/barbican/plugin/crypto/pkcs11.py b/barbican/plugin/crypto/pkcs11.py index 1bf952cbd..755546a04 100644 --- a/barbican/plugin/crypto/pkcs11.py +++ b/barbican/plugin/crypto/pkcs11.py @@ -325,7 +325,8 @@ def build_ffi(): class PKCS11(object): def __init__(self, library_path, login_passphrase, rw_session, slot_id, ffi=None, algorithm='CKM_AES_GCM', - seed_random_buffer=None): + seed_random_buffer=None, + generate_iv=None): self.ffi = ffi or build_ffi() self.lib = self.ffi.dlopen(library_path) rv = self.lib.C_Initialize(self.ffi.NULL) @@ -341,6 +342,7 @@ class PKCS11(object): self.blocksize = 16 self.noncesize = 12 self.gcmtagsize = 16 + self.generate_iv = generate_iv # Validate configuration and RNG session = self.get_session() @@ -391,21 +393,33 @@ class PKCS11(object): return key def encrypt(self, key, pt_data, session): - ck_mechanism = self._build_gcm_mechanism() + iv = None + if self.generate_iv: + iv = self._generate_random(self.noncesize, session) + ck_mechanism = self._build_gcm_mechanism(iv) rv = self.lib.C_EncryptInit(session, ck_mechanism.mech, key) self._check_error(rv) pt_len = len(pt_data) - ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize * 2) + if self.generate_iv: + ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize) + else: + ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize * 2) ct = self.ffi.new("CK_BYTE[{0}]".format(ct_len[0])) rv = self.lib.C_Encrypt(session, pt_data, pt_len, ct, ct_len) self._check_error(rv) - # HSM-generated IVs are appended to the end of the ciphertext - return { - "iv": self.ffi.buffer(ct, ct_len[0])[-self.gcmtagsize:], - "ct": self.ffi.buffer(ct, ct_len[0])[:-self.gcmtagsize] - } + if self.generate_iv: + return { + "iv": self.ffi.buffer(iv)[:], + "ct": self.ffi.buffer(ct, ct_len[0])[:] + } + else: + # HSM-generated IVs are appended to the end of the ciphertext + return { + "iv": self.ffi.buffer(ct, ct_len[0])[-self.gcmtagsize:], + "ct": self.ffi.buffer(ct, ct_len[0])[:-self.gcmtagsize] + } def decrypt(self, key, iv, ct_data, session): iv = self.ffi.new("CK_BYTE[{0}]".format(len(iv)), iv) diff --git a/barbican/tests/plugin/crypto/test_pkcs11.py b/barbican/tests/plugin/crypto/test_pkcs11.py index 56e272422..5b538dc06 100644 --- a/barbican/tests/plugin/crypto/test_pkcs11.py +++ b/barbican/tests/plugin/crypto/test_pkcs11.py @@ -108,7 +108,11 @@ class WhenTestingPKCS11(utils.BaseTestCase): return pkcs11.CKR_OK def _encrypt(self, session, pt, pt_len, ct, ct_len): - self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * (self.pkcs11.gcmtagsize * 2) + if self.pkcs11.generate_iv: + self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * self.pkcs11.gcmtagsize + else: + self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * (self.pkcs11.gcmtagsize + * 2) return pkcs11.CKR_OK def _decrypt(self, session, ct, ct_len, pt, pt_len): @@ -232,8 +236,9 @@ class WhenTestingPKCS11(utils.BaseTestCase): mock.MagicMock(), mock.MagicMock(), encrypt=True, master_key=True) - def test_encrypt(self): + def test_encrypt_with_no_iv_generation(self): pt = b'0123456789ABCDEF' + self.pkcs11.generate_iv = False ct = self.pkcs11.encrypt(mock.MagicMock(), pt, mock.MagicMock()) self.assertEqual(ct['ct'][:len(pt)], pt[::-1]) @@ -243,6 +248,18 @@ class WhenTestingPKCS11(utils.BaseTestCase): self.assertEqual(1, self.lib.C_EncryptInit.call_count) self.assertEqual(1, self.lib.C_Encrypt.call_count) + def test_encrypt_with_iv_generation(self): + pt = b'0123456789ABCDEF' + self.pkcs11.generate_iv = True + ct = self.pkcs11.encrypt(mock.MagicMock(), pt, mock.MagicMock()) + + self.assertEqual(ct['ct'][:len(pt)], pt[::-1]) + self.assertGreater(len(ct['iv']), 0) + + self.assertEqual(2, self.lib.C_GenerateRandom.call_count) + self.assertEqual(1, self.lib.C_EncryptInit.call_count) + self.assertEqual(1, self.lib.C_Encrypt.call_count) + def test_decrypt(self): ct = b'FEDCBA9876543210' + b'0' * self.pkcs11.gcmtagsize iv = b'0' * self.pkcs11.noncesize