Merge "Making iv generation configurable for pkcs11 plugin"
This commit is contained in:
commit
ee3675cfb1
@ -72,6 +72,9 @@ p11_crypto_plugin_opts = [
|
|||||||
cfg.StrOpt('plugin_name',
|
cfg.StrOpt('plugin_name',
|
||||||
help=u._('User friendly plugin name'),
|
help=u._('User friendly plugin name'),
|
||||||
default='PKCS11 HSM'),
|
default='PKCS11 HSM'),
|
||||||
|
cfg.BoolOpt('generate_iv',
|
||||||
|
help=u._('Flag for plugin generated iv case'),
|
||||||
|
default=False),
|
||||||
]
|
]
|
||||||
CONF.register_group(p11_crypto_plugin_group)
|
CONF.register_group(p11_crypto_plugin_group)
|
||||||
CONF.register_opts(p11_crypto_plugin_opts, group=p11_crypto_plugin_group)
|
CONF.register_opts(p11_crypto_plugin_opts, group=p11_crypto_plugin_group)
|
||||||
@ -286,6 +289,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||||||
ffi=ffi,
|
ffi=ffi,
|
||||||
algorithm=plugin_conf.algorithm,
|
algorithm=plugin_conf.algorithm,
|
||||||
seed_random_buffer=seed_random_buffer,
|
seed_random_buffer=seed_random_buffer,
|
||||||
|
generate_iv=plugin_conf.generate_iv,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _reinitialize_pkcs11(self):
|
def _reinitialize_pkcs11(self):
|
||||||
|
@ -325,7 +325,8 @@ def build_ffi():
|
|||||||
class PKCS11(object):
|
class PKCS11(object):
|
||||||
def __init__(self, library_path, login_passphrase, rw_session, slot_id,
|
def __init__(self, library_path, login_passphrase, rw_session, slot_id,
|
||||||
ffi=None, algorithm='CKM_AES_GCM',
|
ffi=None, algorithm='CKM_AES_GCM',
|
||||||
seed_random_buffer=None):
|
seed_random_buffer=None,
|
||||||
|
generate_iv=None):
|
||||||
self.ffi = ffi or build_ffi()
|
self.ffi = ffi or build_ffi()
|
||||||
self.lib = self.ffi.dlopen(library_path)
|
self.lib = self.ffi.dlopen(library_path)
|
||||||
rv = self.lib.C_Initialize(self.ffi.NULL)
|
rv = self.lib.C_Initialize(self.ffi.NULL)
|
||||||
@ -341,6 +342,7 @@ class PKCS11(object):
|
|||||||
self.blocksize = 16
|
self.blocksize = 16
|
||||||
self.noncesize = 12
|
self.noncesize = 12
|
||||||
self.gcmtagsize = 16
|
self.gcmtagsize = 16
|
||||||
|
self.generate_iv = generate_iv
|
||||||
|
|
||||||
# Validate configuration and RNG
|
# Validate configuration and RNG
|
||||||
session = self.get_session()
|
session = self.get_session()
|
||||||
@ -391,21 +393,33 @@ class PKCS11(object):
|
|||||||
return key
|
return key
|
||||||
|
|
||||||
def encrypt(self, key, pt_data, session):
|
def encrypt(self, key, pt_data, session):
|
||||||
ck_mechanism = self._build_gcm_mechanism()
|
iv = None
|
||||||
|
if self.generate_iv:
|
||||||
|
iv = self._generate_random(self.noncesize, session)
|
||||||
|
ck_mechanism = self._build_gcm_mechanism(iv)
|
||||||
rv = self.lib.C_EncryptInit(session, ck_mechanism.mech, key)
|
rv = self.lib.C_EncryptInit(session, ck_mechanism.mech, key)
|
||||||
self._check_error(rv)
|
self._check_error(rv)
|
||||||
|
|
||||||
pt_len = len(pt_data)
|
pt_len = len(pt_data)
|
||||||
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize * 2)
|
if self.generate_iv:
|
||||||
|
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize)
|
||||||
|
else:
|
||||||
|
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize * 2)
|
||||||
ct = self.ffi.new("CK_BYTE[{0}]".format(ct_len[0]))
|
ct = self.ffi.new("CK_BYTE[{0}]".format(ct_len[0]))
|
||||||
rv = self.lib.C_Encrypt(session, pt_data, pt_len, ct, ct_len)
|
rv = self.lib.C_Encrypt(session, pt_data, pt_len, ct, ct_len)
|
||||||
self._check_error(rv)
|
self._check_error(rv)
|
||||||
|
|
||||||
# HSM-generated IVs are appended to the end of the ciphertext
|
if self.generate_iv:
|
||||||
return {
|
return {
|
||||||
"iv": self.ffi.buffer(ct, ct_len[0])[-self.gcmtagsize:],
|
"iv": self.ffi.buffer(iv)[:],
|
||||||
"ct": self.ffi.buffer(ct, ct_len[0])[:-self.gcmtagsize]
|
"ct": self.ffi.buffer(ct, ct_len[0])[:]
|
||||||
}
|
}
|
||||||
|
else:
|
||||||
|
# HSM-generated IVs are appended to the end of the ciphertext
|
||||||
|
return {
|
||||||
|
"iv": self.ffi.buffer(ct, ct_len[0])[-self.gcmtagsize:],
|
||||||
|
"ct": self.ffi.buffer(ct, ct_len[0])[:-self.gcmtagsize]
|
||||||
|
}
|
||||||
|
|
||||||
def decrypt(self, key, iv, ct_data, session):
|
def decrypt(self, key, iv, ct_data, session):
|
||||||
iv = self.ffi.new("CK_BYTE[{0}]".format(len(iv)), iv)
|
iv = self.ffi.new("CK_BYTE[{0}]".format(len(iv)), iv)
|
||||||
|
@ -108,7 +108,11 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
|||||||
return pkcs11.CKR_OK
|
return pkcs11.CKR_OK
|
||||||
|
|
||||||
def _encrypt(self, session, pt, pt_len, ct, ct_len):
|
def _encrypt(self, session, pt, pt_len, ct, ct_len):
|
||||||
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * (self.pkcs11.gcmtagsize * 2)
|
if self.pkcs11.generate_iv:
|
||||||
|
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * self.pkcs11.gcmtagsize
|
||||||
|
else:
|
||||||
|
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * (self.pkcs11.gcmtagsize
|
||||||
|
* 2)
|
||||||
return pkcs11.CKR_OK
|
return pkcs11.CKR_OK
|
||||||
|
|
||||||
def _decrypt(self, session, ct, ct_len, pt, pt_len):
|
def _decrypt(self, session, ct, ct_len, pt, pt_len):
|
||||||
@ -232,8 +236,9 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
|||||||
mock.MagicMock(), mock.MagicMock(),
|
mock.MagicMock(), mock.MagicMock(),
|
||||||
encrypt=True, master_key=True)
|
encrypt=True, master_key=True)
|
||||||
|
|
||||||
def test_encrypt(self):
|
def test_encrypt_with_no_iv_generation(self):
|
||||||
pt = b'0123456789ABCDEF'
|
pt = b'0123456789ABCDEF'
|
||||||
|
self.pkcs11.generate_iv = False
|
||||||
ct = self.pkcs11.encrypt(mock.MagicMock(), pt, mock.MagicMock())
|
ct = self.pkcs11.encrypt(mock.MagicMock(), pt, mock.MagicMock())
|
||||||
|
|
||||||
self.assertEqual(ct['ct'][:len(pt)], pt[::-1])
|
self.assertEqual(ct['ct'][:len(pt)], pt[::-1])
|
||||||
@ -243,6 +248,18 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
|||||||
self.assertEqual(1, self.lib.C_EncryptInit.call_count)
|
self.assertEqual(1, self.lib.C_EncryptInit.call_count)
|
||||||
self.assertEqual(1, self.lib.C_Encrypt.call_count)
|
self.assertEqual(1, self.lib.C_Encrypt.call_count)
|
||||||
|
|
||||||
|
def test_encrypt_with_iv_generation(self):
|
||||||
|
pt = b'0123456789ABCDEF'
|
||||||
|
self.pkcs11.generate_iv = True
|
||||||
|
ct = self.pkcs11.encrypt(mock.MagicMock(), pt, mock.MagicMock())
|
||||||
|
|
||||||
|
self.assertEqual(ct['ct'][:len(pt)], pt[::-1])
|
||||||
|
self.assertGreater(len(ct['iv']), 0)
|
||||||
|
|
||||||
|
self.assertEqual(2, self.lib.C_GenerateRandom.call_count)
|
||||||
|
self.assertEqual(1, self.lib.C_EncryptInit.call_count)
|
||||||
|
self.assertEqual(1, self.lib.C_Encrypt.call_count)
|
||||||
|
|
||||||
def test_decrypt(self):
|
def test_decrypt(self):
|
||||||
ct = b'FEDCBA9876543210' + b'0' * self.pkcs11.gcmtagsize
|
ct = b'FEDCBA9876543210' + b'0' * self.pkcs11.gcmtagsize
|
||||||
iv = b'0' * self.pkcs11.noncesize
|
iv = b'0' * self.pkcs11.noncesize
|
||||||
|
Loading…
Reference in New Issue
Block a user