Making iv generation configurable for pkcs11 plugin
Added configuration flag to enable or disable iv generation as part of pkcs11 plugin logic. In setup where HSM do not have FIPS enabled, in that case pkcs11 plugin is expected to provide iv (initialization vector) for encrypt call. In setup where HSM with FIPS enabled, HSM is going to generate iv and do not need it from plugin side. So logic is updated to generate iv and read returned data accordignly. By default, flag is kept false to match existing behavior. Closes-Bug: #1684997 Change-Id: Ic18d86861a3e51a4370f14c8f7eb39b3f30db2dc
This commit is contained in:
parent
23d064f134
commit
a29e120965
@ -72,6 +72,9 @@ p11_crypto_plugin_opts = [
|
|||||||
cfg.StrOpt('plugin_name',
|
cfg.StrOpt('plugin_name',
|
||||||
help=u._('User friendly plugin name'),
|
help=u._('User friendly plugin name'),
|
||||||
default='PKCS11 HSM'),
|
default='PKCS11 HSM'),
|
||||||
|
cfg.BoolOpt('generate_iv',
|
||||||
|
help=u._('Flag for plugin generated iv case'),
|
||||||
|
default=False),
|
||||||
]
|
]
|
||||||
CONF.register_group(p11_crypto_plugin_group)
|
CONF.register_group(p11_crypto_plugin_group)
|
||||||
CONF.register_opts(p11_crypto_plugin_opts, group=p11_crypto_plugin_group)
|
CONF.register_opts(p11_crypto_plugin_opts, group=p11_crypto_plugin_group)
|
||||||
@ -286,6 +289,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||||||
ffi=ffi,
|
ffi=ffi,
|
||||||
algorithm=plugin_conf.algorithm,
|
algorithm=plugin_conf.algorithm,
|
||||||
seed_random_buffer=seed_random_buffer,
|
seed_random_buffer=seed_random_buffer,
|
||||||
|
generate_iv=plugin_conf.generate_iv,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _reinitialize_pkcs11(self):
|
def _reinitialize_pkcs11(self):
|
||||||
|
@ -325,7 +325,8 @@ def build_ffi():
|
|||||||
class PKCS11(object):
|
class PKCS11(object):
|
||||||
def __init__(self, library_path, login_passphrase, rw_session, slot_id,
|
def __init__(self, library_path, login_passphrase, rw_session, slot_id,
|
||||||
ffi=None, algorithm='CKM_AES_GCM',
|
ffi=None, algorithm='CKM_AES_GCM',
|
||||||
seed_random_buffer=None):
|
seed_random_buffer=None,
|
||||||
|
generate_iv=None):
|
||||||
self.ffi = ffi or build_ffi()
|
self.ffi = ffi or build_ffi()
|
||||||
self.lib = self.ffi.dlopen(library_path)
|
self.lib = self.ffi.dlopen(library_path)
|
||||||
rv = self.lib.C_Initialize(self.ffi.NULL)
|
rv = self.lib.C_Initialize(self.ffi.NULL)
|
||||||
@ -341,6 +342,7 @@ class PKCS11(object):
|
|||||||
self.blocksize = 16
|
self.blocksize = 16
|
||||||
self.noncesize = 12
|
self.noncesize = 12
|
||||||
self.gcmtagsize = 16
|
self.gcmtagsize = 16
|
||||||
|
self.generate_iv = generate_iv
|
||||||
|
|
||||||
# Validate configuration and RNG
|
# Validate configuration and RNG
|
||||||
session = self.get_session()
|
session = self.get_session()
|
||||||
@ -391,21 +393,33 @@ class PKCS11(object):
|
|||||||
return key
|
return key
|
||||||
|
|
||||||
def encrypt(self, key, pt_data, session):
|
def encrypt(self, key, pt_data, session):
|
||||||
ck_mechanism = self._build_gcm_mechanism()
|
iv = None
|
||||||
|
if self.generate_iv:
|
||||||
|
iv = self._generate_random(self.noncesize, session)
|
||||||
|
ck_mechanism = self._build_gcm_mechanism(iv)
|
||||||
rv = self.lib.C_EncryptInit(session, ck_mechanism.mech, key)
|
rv = self.lib.C_EncryptInit(session, ck_mechanism.mech, key)
|
||||||
self._check_error(rv)
|
self._check_error(rv)
|
||||||
|
|
||||||
pt_len = len(pt_data)
|
pt_len = len(pt_data)
|
||||||
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize * 2)
|
if self.generate_iv:
|
||||||
|
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize)
|
||||||
|
else:
|
||||||
|
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize * 2)
|
||||||
ct = self.ffi.new("CK_BYTE[{0}]".format(ct_len[0]))
|
ct = self.ffi.new("CK_BYTE[{0}]".format(ct_len[0]))
|
||||||
rv = self.lib.C_Encrypt(session, pt_data, pt_len, ct, ct_len)
|
rv = self.lib.C_Encrypt(session, pt_data, pt_len, ct, ct_len)
|
||||||
self._check_error(rv)
|
self._check_error(rv)
|
||||||
|
|
||||||
# HSM-generated IVs are appended to the end of the ciphertext
|
if self.generate_iv:
|
||||||
return {
|
return {
|
||||||
"iv": self.ffi.buffer(ct, ct_len[0])[-self.gcmtagsize:],
|
"iv": self.ffi.buffer(iv)[:],
|
||||||
"ct": self.ffi.buffer(ct, ct_len[0])[:-self.gcmtagsize]
|
"ct": self.ffi.buffer(ct, ct_len[0])[:]
|
||||||
}
|
}
|
||||||
|
else:
|
||||||
|
# HSM-generated IVs are appended to the end of the ciphertext
|
||||||
|
return {
|
||||||
|
"iv": self.ffi.buffer(ct, ct_len[0])[-self.gcmtagsize:],
|
||||||
|
"ct": self.ffi.buffer(ct, ct_len[0])[:-self.gcmtagsize]
|
||||||
|
}
|
||||||
|
|
||||||
def decrypt(self, key, iv, ct_data, session):
|
def decrypt(self, key, iv, ct_data, session):
|
||||||
iv = self.ffi.new("CK_BYTE[{0}]".format(len(iv)), iv)
|
iv = self.ffi.new("CK_BYTE[{0}]".format(len(iv)), iv)
|
||||||
|
@ -108,7 +108,11 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
|||||||
return pkcs11.CKR_OK
|
return pkcs11.CKR_OK
|
||||||
|
|
||||||
def _encrypt(self, session, pt, pt_len, ct, ct_len):
|
def _encrypt(self, session, pt, pt_len, ct, ct_len):
|
||||||
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * (self.pkcs11.gcmtagsize * 2)
|
if self.pkcs11.generate_iv:
|
||||||
|
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * self.pkcs11.gcmtagsize
|
||||||
|
else:
|
||||||
|
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * (self.pkcs11.gcmtagsize
|
||||||
|
* 2)
|
||||||
return pkcs11.CKR_OK
|
return pkcs11.CKR_OK
|
||||||
|
|
||||||
def _decrypt(self, session, ct, ct_len, pt, pt_len):
|
def _decrypt(self, session, ct, ct_len, pt, pt_len):
|
||||||
@ -232,8 +236,9 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
|||||||
mock.MagicMock(), mock.MagicMock(),
|
mock.MagicMock(), mock.MagicMock(),
|
||||||
encrypt=True, master_key=True)
|
encrypt=True, master_key=True)
|
||||||
|
|
||||||
def test_encrypt(self):
|
def test_encrypt_with_no_iv_generation(self):
|
||||||
pt = b'0123456789ABCDEF'
|
pt = b'0123456789ABCDEF'
|
||||||
|
self.pkcs11.generate_iv = False
|
||||||
ct = self.pkcs11.encrypt(mock.MagicMock(), pt, mock.MagicMock())
|
ct = self.pkcs11.encrypt(mock.MagicMock(), pt, mock.MagicMock())
|
||||||
|
|
||||||
self.assertEqual(ct['ct'][:len(pt)], pt[::-1])
|
self.assertEqual(ct['ct'][:len(pt)], pt[::-1])
|
||||||
@ -243,6 +248,18 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
|||||||
self.assertEqual(1, self.lib.C_EncryptInit.call_count)
|
self.assertEqual(1, self.lib.C_EncryptInit.call_count)
|
||||||
self.assertEqual(1, self.lib.C_Encrypt.call_count)
|
self.assertEqual(1, self.lib.C_Encrypt.call_count)
|
||||||
|
|
||||||
|
def test_encrypt_with_iv_generation(self):
|
||||||
|
pt = b'0123456789ABCDEF'
|
||||||
|
self.pkcs11.generate_iv = True
|
||||||
|
ct = self.pkcs11.encrypt(mock.MagicMock(), pt, mock.MagicMock())
|
||||||
|
|
||||||
|
self.assertEqual(ct['ct'][:len(pt)], pt[::-1])
|
||||||
|
self.assertGreater(len(ct['iv']), 0)
|
||||||
|
|
||||||
|
self.assertEqual(2, self.lib.C_GenerateRandom.call_count)
|
||||||
|
self.assertEqual(1, self.lib.C_EncryptInit.call_count)
|
||||||
|
self.assertEqual(1, self.lib.C_Encrypt.call_count)
|
||||||
|
|
||||||
def test_decrypt(self):
|
def test_decrypt(self):
|
||||||
ct = b'FEDCBA9876543210' + b'0' * self.pkcs11.gcmtagsize
|
ct = b'FEDCBA9876543210' + b'0' * self.pkcs11.gcmtagsize
|
||||||
iv = b'0' * self.pkcs11.noncesize
|
iv = b'0' * self.pkcs11.noncesize
|
||||||
|
Loading…
Reference in New Issue
Block a user