Configure mechanism for wrapping pKEKs

The PKCS#11 backend key-wraps (encrypts) the project-specific Key
Encryption Keys (pKEKs) using the master encryption key (MKEK).

The mechanism for wrapping/unwrapping the keys was hard-coded to use
CKM_AES_CBC_PAD.  This patch refactors the pkcs11 module to make this
mechanism configurable.

This is necessary to fix Bug #2036506 because some PKCS#11 devices and
software implementations no longer allow CKM_AES_CBC_PAD to be used for
key wrapping.

Supported key wrap mechanisms now include:

* CKM_AES_CBC_PAD
* CKM_AES_KEY_WRAP_PAD
* CKM_AES_KEY_WRAP_KWP

This patch also includes two additional patches so they can all be
tested at the same time:

Fix typo in wrap_key function

This patch fixes a typo in one of the mechanisms in the
PKCS11.wrap_key() function in the pkcs11 module.

and

Increase unit testing coverage for PKCS#11

This patch adds a few tests to increase the test coverage for the
PKCS#11 backend.

Closes-Bug: #2036506
Change-Id: Ic2009a2a55622bb707e884d6a960c044b2248f52
(cherry picked from commit 0d4101fa5d)
(cherry picked from commit 7b36764cd1)
(cherry picked from commit bae6737cb3)
(cherry picked from commit b5841df387)
This commit is contained in:
Douglas Mendizábal 2024-10-25 16:45:58 -04:00
parent d99764e611
commit 6945564c4c
7 changed files with 344 additions and 144 deletions

View File

@ -326,13 +326,16 @@ class HSMCommands(object):
elif type(slotid) is not int:
slotid = int(slotid)
if hmacwrap is None:
hmacwrap = conf.p11_crypto_plugin.hmac_keywrap_mechanism
hmacwrap = conf.p11_crypto_plugin.hmac_mechanism
self.pkcs11 = pkcs11.PKCS11(
library_path=libpath, login_passphrase=passphrase,
rw_session=True, slot_id=slotid,
encryption_mechanism='CKM_AES_CBC',
hmac_keywrap_mechanism=hmacwrap,
library_path=libpath,
login_passphrase=passphrase,
rw_session=conf.p11_crypto_plugin.rw_session,
slot_id=slotid,
encryption_mechanism=conf.p11_crypto_plugin.encryption_mechanism,
hmac_mechanism=hmacwrap,
key_wrap_mechanism=conf.p11_crypto_plugin.key_wrap_mechanism,
token_serial_number=conf.p11_crypto_plugin.token_serial_number,
token_labels=conf.p11_crypto_plugin.token_labels
)

View File

@ -46,7 +46,7 @@ p11_crypto_plugin_opts = [
'devices may require more than one label for Load '
'Balancing or High Availability configurations.')),
cfg.StrOpt('login',
help=u._('Password to login to PKCS11 session'),
help=u._('Password (PIN) to login to PKCS11 session'),
secret=True),
cfg.StrOpt('mkek_label',
help=u._('Master KEK label (as stored in the HSM)')),
@ -77,11 +77,19 @@ p11_crypto_plugin_opts = [
help=u._('HMAC Key Type'),
default='CKK_AES'),
cfg.StrOpt('hmac_keygen_mechanism',
help=u._('HMAC Key Generation Algorithm'),
help=u._('HMAC Key Generation Algorithm used to create the '
'master HMAC Key.'),
default='CKM_AES_KEY_GEN'),
cfg.StrOpt('hmac_keywrap_mechanism',
help=u._('HMAC key wrap mechanism'),
default='CKM_SHA256_HMAC'),
cfg.StrOpt('hmac_mechanism',
help=u._('HMAC algorithm used to sign encrypted data.'),
default='CKM_SHA256_HMAC',
deprecated_name='hmac_keywrap_mechanism'),
cfg.StrOpt('key_wrap_mechanism',
help=u._('Key Wrapping algorithm used to wrap Project KEKs.'),
default='CKM_AES_CBC_PAD'),
cfg.BoolOpt('key_wrap_generate_iv',
help=u._('Generate IVs for Key Wrapping mechanism.'),
default=True),
cfg.StrOpt('seed_file',
help=u._('File to pull entropy for seeding RNG'),
default=''),
@ -134,28 +142,31 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
if plugin_conf.library_path is None:
raise ValueError(u._("library_path is required"))
self.library_path = plugin_conf.library_path
self.login = plugin_conf.login
self.token_serial_number = plugin_conf.token_serial_number
self.token_labels = plugin_conf.token_labels
self.slot_id = plugin_conf.slot_id
self.rw_session = plugin_conf.rw_session
self.seed_file = plugin_conf.seed_file
self.seed_length = plugin_conf.seed_length
self.encryption_mechanism = plugin_conf.encryption_mechanism
self.generate_iv = plugin_conf.aes_gcm_generate_iv
self.encryption_gen_iv = plugin_conf.aes_gcm_generate_iv
self.cka_sensitive = plugin_conf.always_set_cka_sensitive
self.mkek_key_type = 'CKK_AES'
self.mkek_length = plugin_conf.mkek_length
self.mkek_label = plugin_conf.mkek_label
self.hmac_label = plugin_conf.hmac_label
self.hmac_key_type = plugin_conf.hmac_key_type
self.hmac_keygen_mechanism = plugin_conf.hmac_keygen_mechanism
self.hmac_keywrap_mechanism = plugin_conf.hmac_keywrap_mechanism
self.hmac_label = plugin_conf.hmac_label
self.hmac_mechanism = plugin_conf.hmac_mechanism
self.key_wrap_mechanism = plugin_conf.key_wrap_mechanism
self.key_wrap_gen_iv = plugin_conf.key_wrap_generate_iv
self.os_locking_ok = plugin_conf.os_locking_ok
self.pkek_length = plugin_conf.pkek_length
self.pkek_cache_ttl = plugin_conf.pkek_cache_ttl
self.pkek_cache_limit = plugin_conf.pkek_cache_limit
self.rw_session = plugin_conf.rw_session
self.seed_file = plugin_conf.seed_file
self.seed_length = plugin_conf.seed_length
self.slot_id = plugin_conf.slot_id
self.login = plugin_conf.login
self.token_serial_number = plugin_conf.token_serial_number
self.token_labels = plugin_conf.token_labels
# Use specified or create new pkcs11 object
self.pkcs11 = pkcs11 or self._create_pkcs11(ffi)
@ -337,17 +348,19 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
return pkcs11.PKCS11(
library_path=self.library_path,
login_passphrase=self.login,
rw_session=self.rw_session,
slot_id=self.slot_id,
encryption_mechanism=self.encryption_mechanism,
ffi=ffi,
seed_random_buffer=seed_random_buffer,
generate_iv=self.generate_iv,
always_set_cka_sensitive=self.cka_sensitive,
hmac_keywrap_mechanism=self.hmac_keywrap_mechanism,
token_serial_number=self.token_serial_number,
token_labels=self.token_labels,
os_locking_ok=self.os_locking_ok
slot_id=self.slot_id,
rw_session=self.rw_session,
seed_random_buffer=seed_random_buffer,
encryption_mechanism=self.encryption_mechanism,
encryption_gen_iv=self.encryption_gen_iv,
always_set_cka_sensitive=self.cka_sensitive,
hmac_mechanism=self.hmac_mechanism,
key_wrap_mechanism=self.key_wrap_mechanism,
key_wrap_gen_iv=self.key_wrap_gen_iv,
os_locking_ok=self.os_locking_ok,
ffi=ffi
)
def _reinitialize_pkcs11(self):
@ -388,23 +401,33 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
return key
def _load_kek_from_meta_dto(self, kek_meta_dto):
# If plugin_meta is missing the keywrap_mechanism, we default
# to the previously hard-coded CKM_AES_CBC_PAD
_DEFAULT_KEYWRAP_MECHANISM = 'CKM_AES_CBC_PAD'
meta = json.loads(kek_meta_dto.plugin_meta)
keywrap_mechanism = meta.get('key_wrap_mechanism',
_DEFAULT_KEYWRAP_MECHANISM)
LOG.debug("Key Wrap mechanism: %s", keywrap_mechanism)
kek = self._load_kek(
kek_meta_dto.kek_label, meta['iv'], meta['wrapped_key'],
meta['hmac'], meta['mkek_label'], meta['hmac_label']
meta['hmac'], meta['mkek_label'], meta['hmac_label'],
keywrap_mechanism
)
return kek
def _load_kek(self, key_label, iv, wrapped_key, hmac,
mkek_label, hmac_label):
mkek_label, hmac_label, keywrap_mechanism):
with self.pkek_cache_lock:
kek = self._pkek_cache_get(key_label)
if kek is None:
# Decode data
iv = base64.b64decode(iv)
wrapped_key = base64.b64decode(wrapped_key)
hmac = base64.b64decode(hmac)
if iv is None:
kek_data = wrapped_key
else:
iv = base64.b64decode(iv)
kek_data = iv + wrapped_key
hmac = base64.b64decode(hmac)
with self.caching_session_lock:
session = self.caching_session
@ -416,7 +439,11 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
self.pkcs11.verify_hmac(mkhk, hmac, kek_data, session)
# Unwrap KEK
kek = self.pkcs11.unwrap_key(mkek, iv, wrapped_key,
kek = self.pkcs11.unwrap_key(
keywrap_mechanism,
mkek,
iv,
wrapped_key,
session)
self._pkek_cache_add(kek, key_label)
@ -439,16 +466,21 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
wkek = self.pkcs11.wrap_key(mkek, kek, session)
# HMAC Wrapped KEK
if wkek['iv'] is None:
wkek_data = wkek['wrapped_key']
else:
wkek_data = wkek['iv'] + wkek['wrapped_key']
wkek_hmac = self.pkcs11.compute_hmac(mkhk, wkek_data, session)
# Cache KEK
self._pkek_cache_add(kek, key_label)
return {
'iv': base64.b64encode(wkek['iv']),
'iv': wkek['iv'] and base64.b64encode(wkek['iv']),
'wrapped_key': base64.b64encode(wkek['wrapped_key']),
'hmac': base64.b64encode(wkek_hmac),
'mkek_label': self.mkek_label,
'hmac_label': self.hmac_label
'hmac_label': self.hmac_label,
'key_wrap_mechanism': wkek['key_wrap_mechanism']
}

View File

@ -141,7 +141,8 @@ CKM_AES_CBC = 0x1082
CKM_AES_MAC = 0x1083
CKM_AES_CBC_PAD = 0x1085
CKM_AES_GCM = 0x1087
CKM_AES_KEY_WRAP = 0x1090
CKM_AES_KEY_WRAP_PAD = 0x210A
CKM_AES_KEY_WRAP_KWP = 0x210B
CKM_GENERIC_SECRET_KEY_GEN = 0x350
VENDOR_SAFENET_CKM_AES_GCM = 0x8000011c
@ -157,20 +158,30 @@ _ENCRYPTION_MECHANISMS = {
_CBC_IV_SIZE = 16 # bytes
_CBC_BLOCK_SIZE = 128 # bits
# ----- Supported Mechanisms -----
# Barbican only supports the PKCS#11 mechanisms below
_KEY_GEN_MECHANISMS = {
'CKM_AES_KEY_GEN': CKM_AES_KEY_GEN,
'CKM_NC_SHA256_HMAC_KEY_GEN': CKM_NC_SHA256_HMAC_KEY_GEN,
'CKM_GENERIC_SECRET_KEY_GEN': CKM_GENERIC_SECRET_KEY_GEN,
}
_KEY_WRAP_MECHANISMS = {
_HMAC_MECHANISMS = {
'CKM_SHA256_HMAC': CKM_SHA256_HMAC,
'CKM_AES_MAC': CKM_AES_MAC
}
_KEY_WRAP_MECHANISMS = {
'CKM_AES_CBC_PAD': CKM_AES_CBC_PAD,
'CKM_AES_KEY_WRAP_PAD': CKM_AES_KEY_WRAP_PAD,
'CKM_AES_KEY_WRAP_KWP': CKM_AES_KEY_WRAP_KWP
}
CKM_NAMES = dict()
CKM_NAMES.update(_ENCRYPTION_MECHANISMS)
CKM_NAMES.update(_KEY_GEN_MECHANISMS)
CKM_NAMES.update(_HMAC_MECHANISMS)
CKM_NAMES.update(_KEY_WRAP_MECHANISMS)
ERROR_CODES = {
@ -324,6 +335,16 @@ def build_ffi():
CK_BYTE minor;
} CK_VERSION;
typedef struct CK_INFO {
CK_VERSION cryptokiVersion;
CK_UTF8CHAR manufacturerID[32];
CK_FLAGS flags;
CK_UTF8CHAR libraryDescription[32];
CK_VERSION libraryVersion;
} CK_INFO;
typedef CK_INFO * CK_INFO_PTR;
typedef struct CK_SLOT_INFO {
CK_UTF8CHAR slotDescription[64];
CK_UTF8CHAR manufacturerID[32];
@ -384,6 +405,7 @@ def build_ffi():
CK_RV C_GetSessionInfo(CK_SESSION_HANDLE, CK_SESSION_INFO_PTR);
CK_RV C_Login(CK_SESSION_HANDLE, CK_USER_TYPE, CK_UTF8CHAR_PTR,
CK_ULONG);
CK_RV C_GetInfo(CK_INFO_PTR);
CK_RV C_GetSlotList(CK_BBOOL, CK_SLOT_ID_PTR, CK_ULONG_PTR);
CK_RV C_GetSlotInfo(CK_SLOT_ID, CK_SLOT_INFO_PTR);
CK_RV C_GetTokenInfo(CK_SLOT_ID, CK_TOKEN_INFO_PTR);
@ -426,41 +448,31 @@ def build_ffi():
class PKCS11(object):
def __init__(self, library_path, login_passphrase, rw_session, slot_id,
encryption_mechanism=None,
ffi=None, algorithm=None,
seed_random_buffer=None,
generate_iv=None, always_set_cka_sensitive=None,
hmac_keywrap_mechanism='CKM_SHA256_HMAC',
def __init__(self, library_path, login_passphrase,
token_serial_number=None,
token_labels=None,
os_locking_ok=False):
if algorithm:
LOG.warning("WARNING: Using deprecated 'algorithm' argument.")
encryption_mechanism = encryption_mechanism or algorithm
slot_id=None,
rw_session=True,
seed_random_buffer=None,
encryption_mechanism=None,
encryption_gen_iv=True,
always_set_cka_sensitive=True,
hmac_mechanism=None,
key_wrap_mechanism=None,
key_wrap_gen_iv=False,
os_locking_ok=False,
ffi=None):
# Validate all mechanisms are supported
if encryption_mechanism not in _ENCRYPTION_MECHANISMS:
raise ValueError("Invalid encryption_mechanism.")
self.encrypt_mech = _ENCRYPTION_MECHANISMS[encryption_mechanism]
self.encrypt = getattr(
self,
'_{}_encrypt'.format(encryption_mechanism)
)
if hmac_keywrap_mechanism not in _KEY_WRAP_MECHANISMS:
raise ValueError("Invalid HMAC keywrap mechanism")
raise ValueError("Invalid Encryption mechanism.")
if hmac_mechanism not in _HMAC_MECHANISMS:
raise ValueError("Invalid HMAC signing mechanism.")
if key_wrap_mechanism not in _KEY_WRAP_MECHANISMS:
raise ValueError("Invalid Key Wrapping mechanism.")
self.ffi = ffi or build_ffi()
self.lib = self.ffi.dlopen(library_path)
if os_locking_ok:
init_arg_pt = self.ffi.new("CK_C_INITIALIZE_ARGS *")
init_arg_pt.flags = CKF_OS_LOCKING_OK
else:
init_arg_pt = self.ffi.NULL
rv = self.lib.C_Initialize(init_arg_pt)
self._check_error(rv)
self._initialize_library(os_locking_ok)
# Session options
self.login_passphrase = _to_bytes(login_passphrase)
@ -471,13 +483,19 @@ class PKCS11(object):
slot_id)
# Algorithm options
self.algorithm = CKM_NAMES[encryption_mechanism]
self.encrypt_mech = CKM_NAMES[encryption_mechanism]
self.encrypt = getattr(
self,
'_{}_encrypt'.format(encryption_mechanism)
)
self.encrypt_gen_iv = encryption_gen_iv
self.blocksize = 16
self.noncesize = 12
self.gcmtagsize = 16
self.generate_iv = generate_iv
self.always_set_cka_sensitive = always_set_cka_sensitive
self.hmac_keywrap_mechanism = CKM_NAMES[hmac_keywrap_mechanism]
self.hmac_mechanism = CKM_NAMES[hmac_mechanism]
self.key_wrap_mechanism = key_wrap_mechanism
self.key_wrap_gen_iv = key_wrap_gen_iv
# Validate configuration and RNG
session = self.get_session()
@ -487,6 +505,16 @@ class PKCS11(object):
self.return_session(session)
LOG.debug("Connected to PCKS#11 Token in Slot %s", self.slot_id)
def _initialize_library(self, os_locking_ok):
if os_locking_ok:
init_arg_pt = self.ffi.new("CK_C_INITIALIZE_ARGS *")
init_arg_pt.flags = CKF_OS_LOCKING_OK
else:
init_arg_pt = self.ffi.NULL
rv = self.lib.C_Initialize(init_arg_pt)
self._check_error(rv)
def _get_slot_id(self, token_serial_number, token_labels, slot_id):
# First find out how many slots with tokens are available
slots_ptr = self.ffi.new("CK_ULONG_PTR")
@ -640,14 +668,14 @@ class PKCS11(object):
def _VENDOR_SAFENET_CKM_AES_GCM_encrypt(self, key, pt_data, session):
iv = None
if self.generate_iv:
if self.encrypt_gen_iv:
iv = self._generate_random(self.noncesize, session)
ck_mechanism = self._build_gcm_mechanism(iv)
rv = self.lib.C_EncryptInit(session, ck_mechanism.mech, key)
self._check_error(rv)
pt_len = len(pt_data)
if self.generate_iv:
if self.encrypt_gen_iv:
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize)
else:
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize * 2)
@ -655,7 +683,7 @@ class PKCS11(object):
rv = self.lib.C_Encrypt(session, pt_data, pt_len, ct, ct_len)
self._check_error(rv)
if self.generate_iv:
if self.encrypt_gen_iv:
return {
"iv": self.ffi.buffer(iv)[:],
"ct": self.ffi.buffer(ct, ct_len[0])[:]
@ -669,7 +697,7 @@ class PKCS11(object):
def _build_gcm_mechanism(self, iv=None):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.algorithm
mech.mechanism = self.encrypt_mech
gcm = self.ffi.new("CK_AES_GCM_PARAMS *")
if iv:
@ -774,11 +802,19 @@ class PKCS11(object):
return obj_handle_ptr[0]
def wrap_key(self, wrapping_key, key_to_wrap, session):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = CKM_AES_CBC_PAD
iv = self._generate_random(16, session)
mech.parameter = iv
mech.parameter_len = 16
if self.key_wrap_gen_iv:
iv_len = {
'CKM_AES_CBC_PAD': 16, # bytes
'CKM_AES_KEY_WRAP_PAD': 8, # bytes
'CKM_AES_KEY_WRAP_KWP': 4 # bytes
}.get(self.key_wrap_mechanism)
iv = self._generate_random(iv_len, session)
else:
iv = None
mech = self._build_key_wrap_mechanism(
CKM_NAMES[self.key_wrap_mechanism],
iv
)
# Ask for length of the wrapped key
wrapped_key_len = self.ffi.new("CK_ULONG *")
@ -797,18 +833,27 @@ class PKCS11(object):
self._check_error(rv)
return {
'iv': self.ffi.buffer(iv)[:],
'wrapped_key': self.ffi.buffer(wrapped_key, wrapped_key_len[0])[:]
'iv': iv and self.ffi.buffer(iv)[:],
'wrapped_key': self.ffi.buffer(wrapped_key, wrapped_key_len[0])[:],
'key_wrap_mechanism': self.key_wrap_mechanism
}
def unwrap_key(self, wrapping_key, iv, wrapped_key, session):
ck_iv = self.ffi.new("CK_BYTE[]", iv)
ck_wrapped_key = self.ffi.new("CK_BYTE[]", wrapped_key)
unwrapped_key = self.ffi.new("CK_OBJECT_HANDLE *")
def _build_key_wrap_mechanism(self, mechanism, iv):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = CKM_AES_CBC_PAD
mech.parameter = ck_iv
mech.mechanism = mechanism
if iv is not None:
mech.parameter = iv
mech.parameter_len = len(iv)
return mech
def unwrap_key(self, mechanism, wrapping_key, iv, wrapped_key, session):
ck_wrapped_key = self.ffi.new("CK_BYTE[]", wrapped_key)
ck_iv = iv and self.ffi.new("CK_BYTE[{}]".format(len(iv)), iv)
unwrapped_key = self.ffi.new("CK_OBJECT_HANDLE *")
mech = self._build_key_wrap_mechanism(
CKM_NAMES[mechanism],
ck_iv
)
ck_attributes = self._build_attributes([
Attribute(CKA_CLASS, CKO_SECRET_KEY),
@ -830,7 +875,7 @@ class PKCS11(object):
def compute_hmac(self, hmac_key, data, session):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.hmac_keywrap_mechanism
mech.mechanism = self.hmac_mechanism
rv = self.lib.C_SignInit(session, mech, hmac_key)
self._check_error(rv)
@ -843,7 +888,7 @@ class PKCS11(object):
def verify_hmac(self, hmac_key, sig, data, session):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.hmac_keywrap_mechanism
mech.mechanism = self.hmac_mechanism
rv = self.lib.C_VerifyInit(session, mech, hmac_key)
self._check_error(rv)

View File

@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import builtins
import collections
import os
from unittest import mock
from barbican.common import exception as ex
@ -24,6 +27,11 @@ from barbican.plugin.crypto import pkcs11
from barbican.tests import utils
FakeKEKMetaDTO = collections.namedtuple(
'FakeKEKMetaDTO', 'kek_label, plugin_meta'
)
def generate_random_effect(length, session):
return b'0' * length
@ -41,7 +49,10 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
self.pkcs11.encrypt.return_value = {'iv': b'0', 'ct': b'0'}
self.pkcs11.decrypt.return_value = b'0'
self.pkcs11.generate_key.return_value = int(3)
self.pkcs11.wrap_key.return_value = {'iv': b'1', 'wrapped_key': b'1'}
self.pkcs11.wrap_key.return_value = {
'iv': b'1',
'wrapped_key': b'1',
'key_wrap_mechanism': 'CKM_AES_CBC_PAD'}
self.pkcs11.unwrap_key.return_value = int(4)
self.pkcs11.compute_hmac.return_value = b'1'
self.pkcs11.verify_hmac.return_value = None
@ -63,8 +74,11 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
self.cfg_mock.p11_crypto_plugin.encryption_mechanism = 'CKM_AES_CBC'
self.cfg_mock.p11_crypto_plugin.seed_file = ''
self.cfg_mock.p11_crypto_plugin.seed_length = 32
self.cfg_mock.p11_crypto_plugin.hmac_keywrap_mechanism = \
self.cfg_mock.p11_crypto_plugin.hmac_mechanism = \
'CKM_SHA256_HMAC'
self.cfg_mock.p11_crypto_plugin.key_wrap_mechanism = \
'CKM_AES_CBC_PAD'
self.cfg_mock.p11_crypto_plugin.key_wrap_gen_iv = True
self.plugin_name = 'Test PKCS11 plugin'
self.cfg_mock.p11_crypto_plugin.plugin_name = self.plugin_name
@ -157,7 +171,9 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
def test_decrypt(self):
ct = b'ctct'
kek_meta_extended = '{"iv":"AAAA","mechanism":"CKM_AES_CBC"}'
kek_meta_extended = ('{"iv":"AAAA",'
'"mechanism":"CKM_AES_CBC",'
'"key_wrap_mechanism":"CKM_AES_CBC_PAD"}')
decrypt_dto = plugin_import.DecryptDTO(ct)
kek_meta = mock.MagicMock()
kek_meta.kek_label = 'pkek'
@ -355,3 +371,45 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
def test_get_plugin_name(self):
self.assertEqual(self.plugin_name, self.plugin.get_plugin_name())
def test_load_kek_from_meta_dto_no_key_wrap_mechanism(self):
key = base64.b64encode(os.urandom(32)).decode('UTF-8')
hmac = base64.b64encode(os.urandom(16)).decode('UTF-8')
fake_dto = FakeKEKMetaDTO('test_kek', p11_crypto.json_dumps_compact({
'iv': None,
'wrapped_key': key,
'hmac': hmac,
'mkek_label': 'test_mkek',
'hmac_label': 'test_hmac'
}))
load_mock = mock.MagicMock()
self.plugin._load_kek = load_mock
self.plugin._load_kek_from_meta_dto(fake_dto)
# key_wrap_mechanism should default to 'CKM_AES_CBC_PAD'
load_mock.assert_called_with(
'test_kek', None, key, hmac,
'test_mkek', 'test_hmac', 'CKM_AES_CBC_PAD')
def test_load_kek_no_iv(self):
key = os.urandom(32)
wrapped = base64.b64encode(key).decode('UTF-8')
hmac = base64.b64encode(os.urandom(16)).decode('UTF-8')
self.plugin._load_kek('test_key', None, wrapped, hmac, 'mkek_label',
'hmac_label', 'CKM_AES_KEY_WRAP_KWP')
key in self.pkcs11.verify_hmac.call_args.args
def test_generate_wrapped_kek_no_iv(self):
wrapped = base64.b64encode(os.urandom(32))
self.pkcs11.wrap_key.return_value = {
'iv': None,
'wrapped_key': wrapped,
'key_wrap_mechanism': 'CKM_AES_KEY_WRAP_KWP'
}
_ = self.plugin._generate_wrapped_kek(32, 'test_kek')
wrapped in self.pkcs11.compute_hmac.call_args.args

View File

@ -59,18 +59,22 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.cfg_mock.rw_session = False
self.cfg_mock.slot_id = 1
self.cfg_mock.encryption_mechanism = 'CKM_AES_CBC'
self.cfg_mock.hmac_keywrap_mechanism = 'CKM_SHA256_HMAC'
self.cfg_mock.hmac_mechanism = 'CKM_SHA256_HMAC'
self.cfg_mock.key_wrap_mechanism = 'CKM_AES_KEY_WRAP_KWP'
self.token_mock = mock.MagicMock()
self.token_mock.label = b'myLabel'
self.token_mock.serial_number = b'111111'
self.pkcs11 = pkcs11.PKCS11(
self.cfg_mock.library_path, self.cfg_mock.login_passphrase,
self.cfg_mock.rw_session, self.cfg_mock.slot_id,
self.cfg_mock.encryption_mechanism,
self.cfg_mock.library_path,
self.cfg_mock.login_passphrase,
slot_id=self.cfg_mock.slot_id,
rw_session=self.cfg_mock.rw_session,
encryption_mechanism=self.cfg_mock.encryption_mechanism,
hmac_mechanism=self.cfg_mock.hmac_mechanism,
key_wrap_mechanism=self.cfg_mock.key_wrap_mechanism,
ffi=self.ffi,
hmac_keywrap_mechanism=self.cfg_mock.hmac_keywrap_mechanism
)
def _generate_random(self, session, buf, length):
@ -140,7 +144,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
return pkcs11.CKR_OK
def _encrypt(self, session, pt, pt_len, ct, ct_len):
if self.pkcs11.generate_iv:
if self.pkcs11.encrypt_gen_iv:
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * self.pkcs11.gcmtagsize
else:
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * (self.pkcs11.gcmtagsize
@ -174,6 +178,33 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def _verify(self, *args, **kwargs):
return pkcs11.CKR_OK
def test_init_raises_invalid_encryption_mechanism(self):
self.assertRaises(
ValueError,
pkcs11.PKCS11,
self.cfg_mock.library_path,
self.cfg_mock.login_passphrase,
encryption_mechanism='CKM_BOGUS')
def test_init_raises_invalid_hmac_mechanism(self):
self.assertRaises(
ValueError,
pkcs11.PKCS11,
self.cfg_mock.library_path,
self.cfg_mock.login_passphrase,
encryption_mechanism='CKM_AES_GCM',
hmac_mechanism='CKM_BOGUS')
def test_init_raises_invalid_key_wrap_mechanism(self):
self.assertRaises(
ValueError,
pkcs11.PKCS11,
self.cfg_mock.library_path,
self.cfg_mock.login_passphrase,
encryption_mechanism='CKM_AES_GCM',
hmac_mechanism='CKM_SHA256_HMAC',
key_wrap_mechanism='CKM_BOGUS')
def test_get_slot_id_from_serial_number(self):
slot_id = self.pkcs11._get_slot_id('111111', None, 2)
self.assertEqual(1, slot_id)
@ -313,7 +344,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def test_encrypt_with_no_iv_generation(self):
pt = b'0123456789ABCDEF'
self.pkcs11.generate_iv = False
self.pkcs11.encrypt_gen_iv = False
ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt(
mock.MagicMock(),
pt, mock.MagicMock()
@ -328,7 +359,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def test_encrypt_with_iv_generation(self):
pt = b'0123456789ABCDEF'
self.pkcs11.generate_iv = True
self.pkcs11.encrypt_gen_iv = True
ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt(
mock.MagicMock(), pt, mock.MagicMock()
)
@ -412,7 +443,8 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(1, self.lib.C_DecryptInit.call_count)
self.assertEqual(1, self.lib.C_Decrypt.call_count)
def test_wrap_key(self):
def test_wrap_key_with_iv_generation(self):
self.pkcs11.key_wrap_gen_iv = True
wkek = self.pkcs11.wrap_key(mock.Mock(), mock.Mock(), mock.Mock())
self.assertGreater(len(wkek['iv']), 0)
self.assertEqual(b'0' * 16, wkek['wrapped_key'])
@ -420,9 +452,22 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(2, self.lib.C_GenerateRandom.call_count)
self.assertEqual(2, self.lib.C_WrapKey.call_count)
def test_wrap_key_no_iv_generation(self):
self.pkcs11.key_wrap_gen_iv = False
wkek = self.pkcs11.wrap_key(mock.Mock(), mock.Mock(), mock.Mock())
self.assertIsNone(wkek['iv'])
self.assertEqual(b'0' * 16, wkek['wrapped_key'])
self.assertEqual(1, self.lib.C_GenerateRandom.call_count)
self.assertEqual(2, self.lib.C_WrapKey.call_count)
def test_unwrap_key(self):
kek = self.pkcs11.unwrap_key(mock.Mock(), b'0' * 16,
b'0' * 16, mock.Mock())
kek = self.pkcs11.unwrap_key(
'CKM_AES_CBC_PAD',
b'0' * 16,
b'0' * 16,
b'0' * 16,
mock.Mock())
self.assertEqual(1, kek)
self.assertEqual(self.lib.C_UnwrapKey.call_count, 1)

View File

@ -91,70 +91,65 @@ and signed with HMAC key. Both MKEK and HMAC resides in the HSM.
The configuration for this plugin in ``/etc/barbican/barbican.conf``.
Settings for some different HSMs are provided below:
Thales Luna Network HSM (Safenet)
+++++++++++++++++++++++++++++++++
Thales Luna Network HSM
+++++++++++++++++++++++
The PKCS#11 plugin configuration for Luna Network HSM looks like:
.. code-block:: ini
# ================= Secret Store Plugin ===================
[secretstore]
..
enabled_secretstore_plugins = store_crypto
enable_multiple_secret_stores = True
stores_lookup_suffix = luna
# ========== Secret Store configuration ==========
[secretstore:luna]
secret_store_plugin = store_crypto
crypto_plugin = p11_crypto
# ================= Crypto plugin ===================
[crypto]
..
enabled_crypto_plugins = p11_crypto
[p11_crypto_plugin]
# Path to vendor PKCS11 library
library_path = '/usr/lib/libCryptoki2_64.so'
# Token serial number used to identify the token to be used. Required
# when the device has multiple tokens with the same label. (string
# value)
# Token serial number for the token to be used. Required
# when the device has multiple tokens with the same label.
# (string value)
#token_serial_number = 12345678
# Token label used to identify the token to be used. Required when
# Token label for the token to be used. Required when
# token_serial_number is not specified. (string value)
#token_label = <None>
token_labels = myPCKS11Token
# Password to login to PKCS11 session
# (Optional) HSM Slot ID that contains the token device to be used.
# Required when token_serial_number and token_labels are not specified.
# (integer value)
#slot_id = 0
# Password (PIN) to login to PKCS11 session
login = 'mypassword'
# Encryption algorithm used to encrypt secrets
encryption_mechanism = CKM_AES_CBC_GCM
# Label to identify master KEK in the HSM (must not be the same as HMAC label)
mkek_label = 'my_mkek_label'
# Length in bytes of master KEK
mkek_length = 32
# Label to identify HMAC key in the HSM (must not be the same as MKEK label)
# Label to identify master HMAC key in the HSM (must not be the same as MKEK label)
hmac_label = 'my_hmac_label'
# (Optional) HSM Slot ID that contains the token device to be used.
# (integer value)
slot_id = 1
# Key Type for the master HMAC key
hmac_key_type = CKK_GENERIC_SECRET
# HMAC Key Generation Algorithm used to create the master HMAC Key
hmac_keygen_mechanism = CKM_GENERIC_SECRET_KEY_GEN
# Enable Read/Write session with the HSM?
# rw_session = True
# HMAC algorith used to sign ecnrypted data
hmac_mechanism = CKM_SHA256_HMAC
# Length of Project KEKs to create
# pkek_length = 32
# Key Wrap algorithm used to wrap Project KEKs
key_wrap_mechanism = CKM_AES_KEY_WRAP_KWP
# How long to cache unwrapped Project KEKs
# pkek_cache_ttl = 900
# Max number of items in pkek cache
# pkek_cache_limit = 100
.. note::
Barbican does not support FIPS mode enabled for SafeNet Luna HSM or
Data Protection on Demand HSM. Make sure that it's operating in non-FIPS
mode while integrating with Barbican.
The HMAC and MKEK keys can be generated as follows:

View File

@ -0,0 +1,22 @@
---
deprecations:
- |
The `[p11_crypto_plugin]hmac_keywrap_mechanism` option has been replaced
by `[p11_crypto_plugin]hmac_mechanism`. This option was renamed to avoid
confusion since this mechanism is only used to sign encrypted data and
never used for key wrap encryption.
security:
- |
The PKCS#11 backend driver has been updated to support newer Key Wrap
mechanisms. New deployments should use CKM_AES_KEY_WRAP_KWP, but
CKM_AES_KEY_WRAP_PAD and CKM_AES_CBC_PAD are also supported for
compatibility with older devices that have not yet implemented PKCS#11
Version 3.0.
fixes:
- |
Fixed Bug #2036506 - This patch replaces the hard-coded CKM_AES_CBC_PAD
mechanism used to wrap pKEKs with an option to configure this mechanism.
Two new options have been added to the [p11_crypto_plugin] section of the
configuration file: `key_wrap_mechanism` and `key_wrap_generate_iv`. These
options default to `CKM_AES_CBC_PAD` and `True` respectively to preserve
backwards compatibility.