Configure mechanism for wrapping pKEKs

The PKCS#11 backend key-wraps (encrypts) the project-specific Key
Encryption Keys (pKEKs) using the master encryption key (MKEK).

The mechanism for wrapping/unwrapping the keys was hard-coded to use
CKM_AES_CBC_PAD.  This patch refactors the pkcs11 module to make this
mechanism configurable.

This is necessary to fix Bug #2036506 because some PKCS#11 devices and
software implementations no longer allow CKM_AES_CBC_PAD to be used for
key wrapping.

Supported key wrap mechanisms now include:

* CKM_AES_CBC_PAD
* CKM_AES_KEY_WRAP_PAD
* CKM_AES_KEY_WRAP_KWP

This patch also includes two additional patches so they can all be
tested at the same time:

Fix typo in wrap_key function

This patch fixes a typo in one of the mechanisms in the
PKCS11.wrap_key() function in the pkcs11 module.

and

Increase unit testing coverage for PKCS#11

This patch adds a few tests to increase the test coverage for the
PKCS#11 backend.

Closes-Bug: #2036506
Change-Id: Ic2009a2a55622bb707e884d6a960c044b2248f52
(cherry picked from commit 0d4101fa5da52f242ab0a52955f67769b23485a1)
(cherry picked from commit 7b36764cd12781bdb1acc37dcd52dd4e6637171e)
(cherry picked from commit bae6737cb33ebe47c0655a704ff434539db3dc00)
(cherry picked from commit b5841df387e5ab38caf173950a1d98ab37a51453)
(cherry picked from commit 6945564c4c3c8203f779d17b41e4c38d30664d84)
(cherry picked from commit 20e4946cb8ae5c3465ee033696ca5180b4c50c43)
This commit is contained in:
Douglas Mendizábal 2024-10-25 16:45:58 -04:00 committed by Douglas Mendizabal
parent 004129861b
commit 42b4c41831
7 changed files with 344 additions and 149 deletions

View File

@ -326,13 +326,16 @@ class HSMCommands(object):
elif type(slotid) is not int:
slotid = int(slotid)
if hmacwrap is None:
hmacwrap = conf.p11_crypto_plugin.hmac_keywrap_mechanism
hmacwrap = conf.p11_crypto_plugin.hmac_mechanism
self.pkcs11 = pkcs11.PKCS11(
library_path=libpath, login_passphrase=passphrase,
rw_session=True, slot_id=slotid,
encryption_mechanism='CKM_AES_CBC',
hmac_keywrap_mechanism=hmacwrap,
library_path=libpath,
login_passphrase=passphrase,
rw_session=conf.p11_crypto_plugin.rw_session,
slot_id=slotid,
encryption_mechanism=conf.p11_crypto_plugin.encryption_mechanism,
hmac_mechanism=hmacwrap,
key_wrap_mechanism=conf.p11_crypto_plugin.key_wrap_mechanism,
token_serial_number=conf.p11_crypto_plugin.token_serial_number,
token_labels=conf.p11_crypto_plugin.token_labels
)

View File

@ -50,7 +50,7 @@ p11_crypto_plugin_opts = [
'devices may require more than one label for Load '
'Balancing or High Availability configurations.')),
cfg.StrOpt('login',
help=u._('Password to login to PKCS11 session'),
help=u._('Password (PIN) to login to PKCS11 session'),
secret=True),
cfg.StrOpt('mkek_label',
help=u._('Master KEK label (as stored in the HSM)')),
@ -81,11 +81,19 @@ p11_crypto_plugin_opts = [
help=u._('HMAC Key Type'),
default='CKK_AES'),
cfg.StrOpt('hmac_keygen_mechanism',
help=u._('HMAC Key Generation Algorithm'),
help=u._('HMAC Key Generation Algorithm used to create the '
'master HMAC Key.'),
default='CKM_AES_KEY_GEN'),
cfg.StrOpt('hmac_keywrap_mechanism',
help=u._('HMAC key wrap mechanism'),
default='CKM_SHA256_HMAC'),
cfg.StrOpt('hmac_mechanism',
help=u._('HMAC algorithm used to sign encrypted data.'),
default='CKM_SHA256_HMAC',
deprecated_name='hmac_keywrap_mechanism'),
cfg.StrOpt('key_wrap_mechanism',
help=u._('Key Wrapping algorithm used to wrap Project KEKs.'),
default='CKM_AES_CBC_PAD'),
cfg.BoolOpt('key_wrap_generate_iv',
help=u._('Generate IVs for Key Wrapping mechanism.'),
default=True),
cfg.StrOpt('seed_file',
help=u._('File to pull entropy for seeding RNG'),
default=''),
@ -138,33 +146,31 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
if plugin_conf.library_path is None:
raise ValueError(u._("library_path is required"))
self.library_path = plugin_conf.library_path
self.login = plugin_conf.login
self.token_serial_number = plugin_conf.token_serial_number
self.token_labels = plugin_conf.token_labels
self.slot_id = plugin_conf.slot_id
self.rw_session = plugin_conf.rw_session
self.seed_file = plugin_conf.seed_file
self.seed_length = plugin_conf.seed_length
self.encryption_mechanism = plugin_conf.encryption_mechanism
self.generate_iv = plugin_conf.aes_gcm_generate_iv
self.encryption_gen_iv = plugin_conf.aes_gcm_generate_iv
self.cka_sensitive = plugin_conf.always_set_cka_sensitive
self.mkek_key_type = 'CKK_AES'
self.mkek_length = plugin_conf.mkek_length
self.mkek_label = plugin_conf.mkek_label
self.hmac_label = plugin_conf.hmac_label
self.hmac_key_type = plugin_conf.hmac_key_type
self.hmac_keygen_mechanism = plugin_conf.hmac_keygen_mechanism
self.hmac_keywrap_mechanism = plugin_conf.hmac_keywrap_mechanism
self.hmac_label = plugin_conf.hmac_label
self.hmac_mechanism = plugin_conf.hmac_mechanism
self.key_wrap_mechanism = plugin_conf.key_wrap_mechanism
self.key_wrap_gen_iv = plugin_conf.key_wrap_generate_iv
self.os_locking_ok = plugin_conf.os_locking_ok
self.pkek_length = plugin_conf.pkek_length
self.pkek_cache_ttl = plugin_conf.pkek_cache_ttl
self.pkek_cache_limit = plugin_conf.pkek_cache_limit
self.rw_session = plugin_conf.rw_session
self.seed_file = plugin_conf.seed_file
self.seed_length = plugin_conf.seed_length
self.slot_id = plugin_conf.slot_id
self.login = plugin_conf.login
self.token_serial_number = plugin_conf.token_serial_number
self.token_labels = plugin_conf.token_labels or list()
if plugin_conf.token_label:
LOG.warning('Using deprecated option "token_label". Please update '
'your configuration file.')
if plugin_conf.token_label not in self.token_labels:
self.token_labels.append(plugin_conf.token_label)
# Use specified or create new pkcs11 object
self.pkcs11 = pkcs11 or self._create_pkcs11(ffi)
@ -346,17 +352,19 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
return pkcs11.PKCS11(
library_path=self.library_path,
login_passphrase=self.login,
rw_session=self.rw_session,
slot_id=self.slot_id,
encryption_mechanism=self.encryption_mechanism,
ffi=ffi,
seed_random_buffer=seed_random_buffer,
generate_iv=self.generate_iv,
always_set_cka_sensitive=self.cka_sensitive,
hmac_keywrap_mechanism=self.hmac_keywrap_mechanism,
token_serial_number=self.token_serial_number,
token_labels=self.token_labels,
os_locking_ok=self.os_locking_ok
slot_id=self.slot_id,
rw_session=self.rw_session,
seed_random_buffer=seed_random_buffer,
encryption_mechanism=self.encryption_mechanism,
encryption_gen_iv=self.encryption_gen_iv,
always_set_cka_sensitive=self.cka_sensitive,
hmac_mechanism=self.hmac_mechanism,
key_wrap_mechanism=self.key_wrap_mechanism,
key_wrap_gen_iv=self.key_wrap_gen_iv,
os_locking_ok=self.os_locking_ok,
ffi=ffi
)
def _reinitialize_pkcs11(self):
@ -397,23 +405,33 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
return key
def _load_kek_from_meta_dto(self, kek_meta_dto):
# If plugin_meta is missing the keywrap_mechanism, we default
# to the previously hard-coded CKM_AES_CBC_PAD
_DEFAULT_KEYWRAP_MECHANISM = 'CKM_AES_CBC_PAD'
meta = json.loads(kek_meta_dto.plugin_meta)
keywrap_mechanism = meta.get('key_wrap_mechanism',
_DEFAULT_KEYWRAP_MECHANISM)
LOG.debug("Key Wrap mechanism: %s", keywrap_mechanism)
kek = self._load_kek(
kek_meta_dto.kek_label, meta['iv'], meta['wrapped_key'],
meta['hmac'], meta['mkek_label'], meta['hmac_label']
meta['hmac'], meta['mkek_label'], meta['hmac_label'],
keywrap_mechanism
)
return kek
def _load_kek(self, key_label, iv, wrapped_key, hmac,
mkek_label, hmac_label):
mkek_label, hmac_label, keywrap_mechanism):
with self.pkek_cache_lock:
kek = self._pkek_cache_get(key_label)
if kek is None:
# Decode data
iv = base64.b64decode(iv)
wrapped_key = base64.b64decode(wrapped_key)
if iv is None:
kek_data = wrapped_key
else:
iv = base64.b64decode(iv)
kek_data = iv + wrapped_key
hmac = base64.b64decode(hmac)
kek_data = iv + wrapped_key
with self.caching_session_lock:
session = self.caching_session
@ -425,8 +443,12 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
self.pkcs11.verify_hmac(mkhk, hmac, kek_data, session)
# Unwrap KEK
kek = self.pkcs11.unwrap_key(mkek, iv, wrapped_key,
session)
kek = self.pkcs11.unwrap_key(
keywrap_mechanism,
mkek,
iv,
wrapped_key,
session)
self._pkek_cache_add(kek, key_label)
@ -448,16 +470,21 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
wkek = self.pkcs11.wrap_key(mkek, kek, session)
# HMAC Wrapped KEK
wkek_data = wkek['iv'] + wkek['wrapped_key']
if wkek['iv'] is None:
wkek_data = wkek['wrapped_key']
else:
wkek_data = wkek['iv'] + wkek['wrapped_key']
wkek_hmac = self.pkcs11.compute_hmac(mkhk, wkek_data, session)
# Cache KEK
self._pkek_cache_add(kek, key_label)
return {
'iv': base64.b64encode(wkek['iv']),
'iv': wkek['iv'] and base64.b64encode(wkek['iv']),
'wrapped_key': base64.b64encode(wkek['wrapped_key']),
'hmac': base64.b64encode(wkek_hmac),
'mkek_label': self.mkek_label,
'hmac_label': self.hmac_label
'hmac_label': self.hmac_label,
'key_wrap_mechanism': wkek['key_wrap_mechanism']
}

View File

@ -141,7 +141,8 @@ CKM_AES_CBC = 0x1082
CKM_AES_MAC = 0x1083
CKM_AES_CBC_PAD = 0x1085
CKM_AES_GCM = 0x1087
CKM_AES_KEY_WRAP = 0x1090
CKM_AES_KEY_WRAP_PAD = 0x210A
CKM_AES_KEY_WRAP_KWP = 0x210B
CKM_GENERIC_SECRET_KEY_GEN = 0x350
VENDOR_SAFENET_CKM_AES_GCM = 0x8000011c
@ -157,20 +158,30 @@ _ENCRYPTION_MECHANISMS = {
_CBC_IV_SIZE = 16 # bytes
_CBC_BLOCK_SIZE = 128 # bits
# ----- Supported Mechanisms -----
# Barbican only supports the PKCS#11 mechanisms below
_KEY_GEN_MECHANISMS = {
'CKM_AES_KEY_GEN': CKM_AES_KEY_GEN,
'CKM_NC_SHA256_HMAC_KEY_GEN': CKM_NC_SHA256_HMAC_KEY_GEN,
'CKM_GENERIC_SECRET_KEY_GEN': CKM_GENERIC_SECRET_KEY_GEN,
}
_KEY_WRAP_MECHANISMS = {
_HMAC_MECHANISMS = {
'CKM_SHA256_HMAC': CKM_SHA256_HMAC,
'CKM_AES_MAC': CKM_AES_MAC
}
_KEY_WRAP_MECHANISMS = {
'CKM_AES_CBC_PAD': CKM_AES_CBC_PAD,
'CKM_AES_KEY_WRAP_PAD': CKM_AES_KEY_WRAP_PAD,
'CKM_AES_KEY_WRAP_KWP': CKM_AES_KEY_WRAP_KWP
}
CKM_NAMES = dict()
CKM_NAMES.update(_ENCRYPTION_MECHANISMS)
CKM_NAMES.update(_KEY_GEN_MECHANISMS)
CKM_NAMES.update(_HMAC_MECHANISMS)
CKM_NAMES.update(_KEY_WRAP_MECHANISMS)
ERROR_CODES = {
@ -324,6 +335,16 @@ def build_ffi():
CK_BYTE minor;
} CK_VERSION;
typedef struct CK_INFO {
CK_VERSION cryptokiVersion;
CK_UTF8CHAR manufacturerID[32];
CK_FLAGS flags;
CK_UTF8CHAR libraryDescription[32];
CK_VERSION libraryVersion;
} CK_INFO;
typedef CK_INFO * CK_INFO_PTR;
typedef struct CK_SLOT_INFO {
CK_UTF8CHAR slotDescription[64];
CK_UTF8CHAR manufacturerID[32];
@ -384,6 +405,7 @@ def build_ffi():
CK_RV C_GetSessionInfo(CK_SESSION_HANDLE, CK_SESSION_INFO_PTR);
CK_RV C_Login(CK_SESSION_HANDLE, CK_USER_TYPE, CK_UTF8CHAR_PTR,
CK_ULONG);
CK_RV C_GetInfo(CK_INFO_PTR);
CK_RV C_GetSlotList(CK_BBOOL, CK_SLOT_ID_PTR, CK_ULONG_PTR);
CK_RV C_GetSlotInfo(CK_SLOT_ID, CK_SLOT_INFO_PTR);
CK_RV C_GetTokenInfo(CK_SLOT_ID, CK_TOKEN_INFO_PTR);
@ -426,41 +448,31 @@ def build_ffi():
class PKCS11(object):
def __init__(self, library_path, login_passphrase, rw_session, slot_id,
encryption_mechanism=None,
ffi=None, algorithm=None,
seed_random_buffer=None,
generate_iv=None, always_set_cka_sensitive=None,
hmac_keywrap_mechanism='CKM_SHA256_HMAC',
def __init__(self, library_path, login_passphrase,
token_serial_number=None,
token_labels=None,
os_locking_ok=False):
if algorithm:
LOG.warning("WARNING: Using deprecated 'algorithm' argument.")
encryption_mechanism = encryption_mechanism or algorithm
slot_id=None,
rw_session=True,
seed_random_buffer=None,
encryption_mechanism=None,
encryption_gen_iv=True,
always_set_cka_sensitive=True,
hmac_mechanism=None,
key_wrap_mechanism=None,
key_wrap_gen_iv=False,
os_locking_ok=False,
ffi=None):
# Validate all mechanisms are supported
if encryption_mechanism not in _ENCRYPTION_MECHANISMS:
raise ValueError("Invalid encryption_mechanism.")
self.encrypt_mech = _ENCRYPTION_MECHANISMS[encryption_mechanism]
self.encrypt = getattr(
self,
'_{}_encrypt'.format(encryption_mechanism)
)
if hmac_keywrap_mechanism not in _KEY_WRAP_MECHANISMS:
raise ValueError("Invalid HMAC keywrap mechanism")
raise ValueError("Invalid Encryption mechanism.")
if hmac_mechanism not in _HMAC_MECHANISMS:
raise ValueError("Invalid HMAC signing mechanism.")
if key_wrap_mechanism not in _KEY_WRAP_MECHANISMS:
raise ValueError("Invalid Key Wrapping mechanism.")
self.ffi = ffi or build_ffi()
self.lib = self.ffi.dlopen(library_path)
if os_locking_ok:
init_arg_pt = self.ffi.new("CK_C_INITIALIZE_ARGS *")
init_arg_pt.flags = CKF_OS_LOCKING_OK
else:
init_arg_pt = self.ffi.NULL
rv = self.lib.C_Initialize(init_arg_pt)
self._check_error(rv)
self._initialize_library(os_locking_ok)
# Session options
self.login_passphrase = _to_bytes(login_passphrase)
@ -471,13 +483,19 @@ class PKCS11(object):
slot_id)
# Algorithm options
self.algorithm = CKM_NAMES[encryption_mechanism]
self.encrypt_mech = CKM_NAMES[encryption_mechanism]
self.encrypt = getattr(
self,
'_{}_encrypt'.format(encryption_mechanism)
)
self.encrypt_gen_iv = encryption_gen_iv
self.blocksize = 16
self.noncesize = 12
self.gcmtagsize = 16
self.generate_iv = generate_iv
self.always_set_cka_sensitive = always_set_cka_sensitive
self.hmac_keywrap_mechanism = CKM_NAMES[hmac_keywrap_mechanism]
self.hmac_mechanism = CKM_NAMES[hmac_mechanism]
self.key_wrap_mechanism = key_wrap_mechanism
self.key_wrap_gen_iv = key_wrap_gen_iv
# Validate configuration and RNG
session = self.get_session()
@ -487,6 +505,16 @@ class PKCS11(object):
self.return_session(session)
LOG.debug("Connected to PCKS#11 Token in Slot %s", self.slot_id)
def _initialize_library(self, os_locking_ok):
if os_locking_ok:
init_arg_pt = self.ffi.new("CK_C_INITIALIZE_ARGS *")
init_arg_pt.flags = CKF_OS_LOCKING_OK
else:
init_arg_pt = self.ffi.NULL
rv = self.lib.C_Initialize(init_arg_pt)
self._check_error(rv)
def _get_slot_id(self, token_serial_number, token_labels, slot_id):
# First find out how many slots with tokens are available
slots_ptr = self.ffi.new("CK_ULONG_PTR")
@ -640,14 +668,14 @@ class PKCS11(object):
def _VENDOR_SAFENET_CKM_AES_GCM_encrypt(self, key, pt_data, session):
iv = None
if self.generate_iv:
if self.encrypt_gen_iv:
iv = self._generate_random(self.noncesize, session)
ck_mechanism = self._build_gcm_mechanism(iv)
rv = self.lib.C_EncryptInit(session, ck_mechanism.mech, key)
self._check_error(rv)
pt_len = len(pt_data)
if self.generate_iv:
if self.encrypt_gen_iv:
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize)
else:
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize * 2)
@ -655,7 +683,7 @@ class PKCS11(object):
rv = self.lib.C_Encrypt(session, pt_data, pt_len, ct, ct_len)
self._check_error(rv)
if self.generate_iv:
if self.encrypt_gen_iv:
return {
"iv": self.ffi.buffer(iv)[:],
"ct": self.ffi.buffer(ct, ct_len[0])[:]
@ -669,7 +697,7 @@ class PKCS11(object):
def _build_gcm_mechanism(self, iv=None):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.algorithm
mech.mechanism = self.encrypt_mech
gcm = self.ffi.new("CK_AES_GCM_PARAMS *")
if iv:
@ -774,11 +802,19 @@ class PKCS11(object):
return obj_handle_ptr[0]
def wrap_key(self, wrapping_key, key_to_wrap, session):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = CKM_AES_CBC_PAD
iv = self._generate_random(16, session)
mech.parameter = iv
mech.parameter_len = 16
if self.key_wrap_gen_iv:
iv_len = {
'CKM_AES_CBC_PAD': 16, # bytes
'CKM_AES_KEY_WRAP_PAD': 8, # bytes
'CKM_AES_KEY_WRAP_KWP': 4 # bytes
}.get(self.key_wrap_mechanism)
iv = self._generate_random(iv_len, session)
else:
iv = None
mech = self._build_key_wrap_mechanism(
CKM_NAMES[self.key_wrap_mechanism],
iv
)
# Ask for length of the wrapped key
wrapped_key_len = self.ffi.new("CK_ULONG *")
@ -797,18 +833,27 @@ class PKCS11(object):
self._check_error(rv)
return {
'iv': self.ffi.buffer(iv)[:],
'wrapped_key': self.ffi.buffer(wrapped_key, wrapped_key_len[0])[:]
'iv': iv and self.ffi.buffer(iv)[:],
'wrapped_key': self.ffi.buffer(wrapped_key, wrapped_key_len[0])[:],
'key_wrap_mechanism': self.key_wrap_mechanism
}
def unwrap_key(self, wrapping_key, iv, wrapped_key, session):
ck_iv = self.ffi.new("CK_BYTE[]", iv)
ck_wrapped_key = self.ffi.new("CK_BYTE[]", wrapped_key)
unwrapped_key = self.ffi.new("CK_OBJECT_HANDLE *")
def _build_key_wrap_mechanism(self, mechanism, iv):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = CKM_AES_CBC_PAD
mech.parameter = ck_iv
mech.parameter_len = len(iv)
mech.mechanism = mechanism
if iv is not None:
mech.parameter = iv
mech.parameter_len = len(iv)
return mech
def unwrap_key(self, mechanism, wrapping_key, iv, wrapped_key, session):
ck_wrapped_key = self.ffi.new("CK_BYTE[]", wrapped_key)
ck_iv = iv and self.ffi.new("CK_BYTE[{}]".format(len(iv)), iv)
unwrapped_key = self.ffi.new("CK_OBJECT_HANDLE *")
mech = self._build_key_wrap_mechanism(
CKM_NAMES[mechanism],
ck_iv
)
ck_attributes = self._build_attributes([
Attribute(CKA_CLASS, CKO_SECRET_KEY),
@ -830,7 +875,7 @@ class PKCS11(object):
def compute_hmac(self, hmac_key, data, session):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.hmac_keywrap_mechanism
mech.mechanism = self.hmac_mechanism
rv = self.lib.C_SignInit(session, mech, hmac_key)
self._check_error(rv)
@ -843,7 +888,7 @@ class PKCS11(object):
def verify_hmac(self, hmac_key, sig, data, session):
mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.hmac_keywrap_mechanism
mech.mechanism = self.hmac_mechanism
rv = self.lib.C_VerifyInit(session, mech, hmac_key)
self._check_error(rv)

View File

@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import builtins
import collections
import os
from unittest import mock
from barbican.common import exception as ex
@ -24,6 +27,11 @@ from barbican.plugin.crypto import pkcs11
from barbican.tests import utils
FakeKEKMetaDTO = collections.namedtuple(
'FakeKEKMetaDTO', 'kek_label, plugin_meta'
)
def generate_random_effect(length, session):
return b'0' * length
@ -41,7 +49,10 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
self.pkcs11.encrypt.return_value = {'iv': b'0', 'ct': b'0'}
self.pkcs11.decrypt.return_value = b'0'
self.pkcs11.generate_key.return_value = int(3)
self.pkcs11.wrap_key.return_value = {'iv': b'1', 'wrapped_key': b'1'}
self.pkcs11.wrap_key.return_value = {
'iv': b'1',
'wrapped_key': b'1',
'key_wrap_mechanism': 'CKM_AES_CBC_PAD'}
self.pkcs11.unwrap_key.return_value = int(4)
self.pkcs11.compute_hmac.return_value = b'1'
self.pkcs11.verify_hmac.return_value = None
@ -63,8 +74,11 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
self.cfg_mock.p11_crypto_plugin.encryption_mechanism = 'CKM_AES_CBC'
self.cfg_mock.p11_crypto_plugin.seed_file = ''
self.cfg_mock.p11_crypto_plugin.seed_length = 32
self.cfg_mock.p11_crypto_plugin.hmac_keywrap_mechanism = \
self.cfg_mock.p11_crypto_plugin.hmac_mechanism = \
'CKM_SHA256_HMAC'
self.cfg_mock.p11_crypto_plugin.key_wrap_mechanism = \
'CKM_AES_CBC_PAD'
self.cfg_mock.p11_crypto_plugin.key_wrap_gen_iv = True
self.plugin_name = 'Test PKCS11 plugin'
self.cfg_mock.p11_crypto_plugin.plugin_name = self.plugin_name
@ -157,7 +171,9 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
def test_decrypt(self):
ct = b'ctct'
kek_meta_extended = '{"iv":"AAAA","mechanism":"CKM_AES_CBC"}'
kek_meta_extended = ('{"iv":"AAAA",'
'"mechanism":"CKM_AES_CBC",'
'"key_wrap_mechanism":"CKM_AES_CBC_PAD"}')
decrypt_dto = plugin_import.DecryptDTO(ct)
kek_meta = mock.MagicMock()
kek_meta.kek_label = 'pkek'
@ -355,3 +371,45 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
def test_get_plugin_name(self):
self.assertEqual(self.plugin_name, self.plugin.get_plugin_name())
def test_load_kek_from_meta_dto_no_key_wrap_mechanism(self):
key = base64.b64encode(os.urandom(32)).decode('UTF-8')
hmac = base64.b64encode(os.urandom(16)).decode('UTF-8')
fake_dto = FakeKEKMetaDTO('test_kek', p11_crypto.json_dumps_compact({
'iv': None,
'wrapped_key': key,
'hmac': hmac,
'mkek_label': 'test_mkek',
'hmac_label': 'test_hmac'
}))
load_mock = mock.MagicMock()
self.plugin._load_kek = load_mock
self.plugin._load_kek_from_meta_dto(fake_dto)
# key_wrap_mechanism should default to 'CKM_AES_CBC_PAD'
load_mock.assert_called_with(
'test_kek', None, key, hmac,
'test_mkek', 'test_hmac', 'CKM_AES_CBC_PAD')
def test_load_kek_no_iv(self):
key = os.urandom(32)
wrapped = base64.b64encode(key).decode('UTF-8')
hmac = base64.b64encode(os.urandom(16)).decode('UTF-8')
self.plugin._load_kek('test_key', None, wrapped, hmac, 'mkek_label',
'hmac_label', 'CKM_AES_KEY_WRAP_KWP')
key in self.pkcs11.verify_hmac.call_args.args
def test_generate_wrapped_kek_no_iv(self):
wrapped = base64.b64encode(os.urandom(32))
self.pkcs11.wrap_key.return_value = {
'iv': None,
'wrapped_key': wrapped,
'key_wrap_mechanism': 'CKM_AES_KEY_WRAP_KWP'
}
_ = self.plugin._generate_wrapped_kek(32, 'test_kek')
wrapped in self.pkcs11.compute_hmac.call_args.args

View File

@ -59,18 +59,22 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.cfg_mock.rw_session = False
self.cfg_mock.slot_id = 1
self.cfg_mock.encryption_mechanism = 'CKM_AES_CBC'
self.cfg_mock.hmac_keywrap_mechanism = 'CKM_SHA256_HMAC'
self.cfg_mock.hmac_mechanism = 'CKM_SHA256_HMAC'
self.cfg_mock.key_wrap_mechanism = 'CKM_AES_KEY_WRAP_KWP'
self.token_mock = mock.MagicMock()
self.token_mock.label = b'myLabel'
self.token_mock.serial_number = b'111111'
self.pkcs11 = pkcs11.PKCS11(
self.cfg_mock.library_path, self.cfg_mock.login_passphrase,
self.cfg_mock.rw_session, self.cfg_mock.slot_id,
self.cfg_mock.encryption_mechanism,
self.cfg_mock.library_path,
self.cfg_mock.login_passphrase,
slot_id=self.cfg_mock.slot_id,
rw_session=self.cfg_mock.rw_session,
encryption_mechanism=self.cfg_mock.encryption_mechanism,
hmac_mechanism=self.cfg_mock.hmac_mechanism,
key_wrap_mechanism=self.cfg_mock.key_wrap_mechanism,
ffi=self.ffi,
hmac_keywrap_mechanism=self.cfg_mock.hmac_keywrap_mechanism
)
def _generate_random(self, session, buf, length):
@ -140,7 +144,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
return pkcs11.CKR_OK
def _encrypt(self, session, pt, pt_len, ct, ct_len):
if self.pkcs11.generate_iv:
if self.pkcs11.encrypt_gen_iv:
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * self.pkcs11.gcmtagsize
else:
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * (self.pkcs11.gcmtagsize
@ -174,6 +178,33 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def _verify(self, *args, **kwargs):
return pkcs11.CKR_OK
def test_init_raises_invalid_encryption_mechanism(self):
self.assertRaises(
ValueError,
pkcs11.PKCS11,
self.cfg_mock.library_path,
self.cfg_mock.login_passphrase,
encryption_mechanism='CKM_BOGUS')
def test_init_raises_invalid_hmac_mechanism(self):
self.assertRaises(
ValueError,
pkcs11.PKCS11,
self.cfg_mock.library_path,
self.cfg_mock.login_passphrase,
encryption_mechanism='CKM_AES_GCM',
hmac_mechanism='CKM_BOGUS')
def test_init_raises_invalid_key_wrap_mechanism(self):
self.assertRaises(
ValueError,
pkcs11.PKCS11,
self.cfg_mock.library_path,
self.cfg_mock.login_passphrase,
encryption_mechanism='CKM_AES_GCM',
hmac_mechanism='CKM_SHA256_HMAC',
key_wrap_mechanism='CKM_BOGUS')
def test_get_slot_id_from_serial_number(self):
slot_id = self.pkcs11._get_slot_id('111111', None, 2)
self.assertEqual(1, slot_id)
@ -313,7 +344,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def test_encrypt_with_no_iv_generation(self):
pt = b'0123456789ABCDEF'
self.pkcs11.generate_iv = False
self.pkcs11.encrypt_gen_iv = False
ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt(
mock.MagicMock(),
pt, mock.MagicMock()
@ -328,7 +359,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def test_encrypt_with_iv_generation(self):
pt = b'0123456789ABCDEF'
self.pkcs11.generate_iv = True
self.pkcs11.encrypt_gen_iv = True
ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt(
mock.MagicMock(), pt, mock.MagicMock()
)
@ -412,7 +443,8 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(1, self.lib.C_DecryptInit.call_count)
self.assertEqual(1, self.lib.C_Decrypt.call_count)
def test_wrap_key(self):
def test_wrap_key_with_iv_generation(self):
self.pkcs11.key_wrap_gen_iv = True
wkek = self.pkcs11.wrap_key(mock.Mock(), mock.Mock(), mock.Mock())
self.assertGreater(len(wkek['iv']), 0)
self.assertEqual(b'0' * 16, wkek['wrapped_key'])
@ -420,9 +452,22 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(2, self.lib.C_GenerateRandom.call_count)
self.assertEqual(2, self.lib.C_WrapKey.call_count)
def test_wrap_key_no_iv_generation(self):
self.pkcs11.key_wrap_gen_iv = False
wkek = self.pkcs11.wrap_key(mock.Mock(), mock.Mock(), mock.Mock())
self.assertIsNone(wkek['iv'])
self.assertEqual(b'0' * 16, wkek['wrapped_key'])
self.assertEqual(1, self.lib.C_GenerateRandom.call_count)
self.assertEqual(2, self.lib.C_WrapKey.call_count)
def test_unwrap_key(self):
kek = self.pkcs11.unwrap_key(mock.Mock(), b'0' * 16,
b'0' * 16, mock.Mock())
kek = self.pkcs11.unwrap_key(
'CKM_AES_CBC_PAD',
b'0' * 16,
b'0' * 16,
b'0' * 16,
mock.Mock())
self.assertEqual(1, kek)
self.assertEqual(self.lib.C_UnwrapKey.call_count, 1)

View File

@ -91,70 +91,65 @@ and signed with HMAC key. Both MKEK and HMAC resides in the HSM.
The configuration for this plugin in ``/etc/barbican/barbican.conf``.
Settings for some different HSMs are provided below:
Thales Luna Network HSM (Safenet)
+++++++++++++++++++++++++++++++++
Thales Luna Network HSM
+++++++++++++++++++++++
The PKCS#11 plugin configuration for Luna Network HSM looks like:
.. code-block:: ini
# ================= Secret Store Plugin ===================
[secretstore]
..
enabled_secretstore_plugins = store_crypto
enable_multiple_secret_stores = True
stores_lookup_suffix = luna
# ========== Secret Store configuration ==========
[secretstore:luna]
secret_store_plugin = store_crypto
crypto_plugin = p11_crypto
# ================= Crypto plugin ===================
[crypto]
..
enabled_crypto_plugins = p11_crypto
[p11_crypto_plugin]
# Path to vendor PKCS11 library
library_path = '/usr/lib/libCryptoki2_64.so'
# Token serial number used to identify the token to be used. Required
# when the device has multiple tokens with the same label. (string
# value)
# Token serial number for the token to be used. Required
# when the device has multiple tokens with the same label.
# (string value)
#token_serial_number = 12345678
# Token label used to identify the token to be used. Required when
# Token label for the token to be used. Required when
# token_serial_number is not specified. (string value)
#token_label = <None>
token_labels = myPCKS11Token
# Password to login to PKCS11 session
# (Optional) HSM Slot ID that contains the token device to be used.
# Required when token_serial_number and token_labels are not specified.
# (integer value)
#slot_id = 0
# Password (PIN) to login to PKCS11 session
login = 'mypassword'
# Encryption algorithm used to encrypt secrets
encryption_mechanism = CKM_AES_CBC_GCM
# Label to identify master KEK in the HSM (must not be the same as HMAC label)
mkek_label = 'my_mkek_label'
# Length in bytes of master KEK
mkek_length = 32
# Label to identify HMAC key in the HSM (must not be the same as MKEK label)
# Label to identify master HMAC key in the HSM (must not be the same as MKEK label)
hmac_label = 'my_hmac_label'
# (Optional) HSM Slot ID that contains the token device to be used.
# (integer value)
slot_id = 1
# Key Type for the master HMAC key
hmac_key_type = CKK_GENERIC_SECRET
# HMAC Key Generation Algorithm used to create the master HMAC Key
hmac_keygen_mechanism = CKM_GENERIC_SECRET_KEY_GEN
# Enable Read/Write session with the HSM?
# rw_session = True
# HMAC algorith used to sign ecnrypted data
hmac_mechanism = CKM_SHA256_HMAC
# Length of Project KEKs to create
# pkek_length = 32
# Key Wrap algorithm used to wrap Project KEKs
key_wrap_mechanism = CKM_AES_KEY_WRAP_KWP
# How long to cache unwrapped Project KEKs
# pkek_cache_ttl = 900
# Max number of items in pkek cache
# pkek_cache_limit = 100
.. note::
Barbican does not support FIPS mode enabled for SafeNet Luna HSM or
Data Protection on Demand HSM. Make sure that it's operating in non-FIPS
mode while integrating with Barbican.
The HMAC and MKEK keys can be generated as follows:

View File

@ -0,0 +1,22 @@
---
deprecations:
- |
The `[p11_crypto_plugin]hmac_keywrap_mechanism` option has been replaced
by `[p11_crypto_plugin]hmac_mechanism`. This option was renamed to avoid
confusion since this mechanism is only used to sign encrypted data and
never used for key wrap encryption.
security:
- |
The PKCS#11 backend driver has been updated to support newer Key Wrap
mechanisms. New deployments should use CKM_AES_KEY_WRAP_KWP, but
CKM_AES_KEY_WRAP_PAD and CKM_AES_CBC_PAD are also supported for
compatibility with older devices that have not yet implemented PKCS#11
Version 3.0.
fixes:
- |
Fixed Bug #2036506 - This patch replaces the hard-coded CKM_AES_CBC_PAD
mechanism used to wrap pKEKs with an option to configure this mechanism.
Two new options have been added to the [p11_crypto_plugin] section of the
configuration file: `key_wrap_mechanism` and `key_wrap_generate_iv`. These
options default to `CKM_AES_CBC_PAD` and `True` respectively to preserve
backwards compatibility.