Configure mechanism for wrapping pKEKs

The PKCS#11 backend key-wraps (encrypts) the project-specific Key
Encryption Keys (pKEKs) using the master encryption key (MKEK).

The mechanism for wrapping/unwrapping the keys was hard-coded to use
CKM_AES_CBC_PAD.  This patch refactors the pkcs11 module to make this
mechanism configurable.

This is necessary to fix Bug #2036506 because some PKCS#11 devices and
software implementations no longer allow CKM_AES_CBC_PAD to be used for
key wrapping.

Supported key wrap mechanisms now include:

* CKM_AES_CBC_PAD
* CKM_AES_KEY_WRAP_PAD
* CKM_AES_KEY_WRAP_KWP

Closes-Bug: #2036506
Change-Id: Ic2009a2a55622bb707e884d6a960c044b2248f52
This commit is contained in:
Douglas Mendizábal 2024-10-25 16:45:58 -04:00 committed by Douglas Mendizabal
parent 2ec8724585
commit 0d4101fa5d
7 changed files with 295 additions and 144 deletions

View File

@ -326,13 +326,16 @@ class HSMCommands(object):
elif type(slotid) is not int: elif type(slotid) is not int:
slotid = int(slotid) slotid = int(slotid)
if hmacwrap is None: if hmacwrap is None:
hmacwrap = conf.p11_crypto_plugin.hmac_keywrap_mechanism hmacwrap = conf.p11_crypto_plugin.hmac_mechanism
self.pkcs11 = pkcs11.PKCS11( self.pkcs11 = pkcs11.PKCS11(
library_path=libpath, login_passphrase=passphrase, library_path=libpath,
rw_session=True, slot_id=slotid, login_passphrase=passphrase,
encryption_mechanism='CKM_AES_CBC', rw_session=conf.p11_crypto_plugin.rw_session,
hmac_keywrap_mechanism=hmacwrap, slot_id=slotid,
encryption_mechanism=conf.p11_crypto_plugin.encryption_mechanism,
hmac_mechanism=hmacwrap,
key_wrap_mechanism=conf.p11_crypto_plugin.key_wrap_mechanism,
token_serial_number=conf.p11_crypto_plugin.token_serial_number, token_serial_number=conf.p11_crypto_plugin.token_serial_number,
token_labels=conf.p11_crypto_plugin.token_labels token_labels=conf.p11_crypto_plugin.token_labels
) )

View File

@ -46,7 +46,7 @@ p11_crypto_plugin_opts = [
'devices may require more than one label for Load ' 'devices may require more than one label for Load '
'Balancing or High Availability configurations.')), 'Balancing or High Availability configurations.')),
cfg.StrOpt('login', cfg.StrOpt('login',
help=u._('Password to login to PKCS11 session'), help=u._('Password (PIN) to login to PKCS11 session'),
secret=True), secret=True),
cfg.StrOpt('mkek_label', cfg.StrOpt('mkek_label',
help=u._('Master KEK label (as stored in the HSM)')), help=u._('Master KEK label (as stored in the HSM)')),
@ -79,11 +79,19 @@ p11_crypto_plugin_opts = [
help=u._('HMAC Key Type'), help=u._('HMAC Key Type'),
default='CKK_AES'), default='CKK_AES'),
cfg.StrOpt('hmac_keygen_mechanism', cfg.StrOpt('hmac_keygen_mechanism',
help=u._('HMAC Key Generation Algorithm'), help=u._('HMAC Key Generation Algorithm used to create the '
'master HMAC Key.'),
default='CKM_AES_KEY_GEN'), default='CKM_AES_KEY_GEN'),
cfg.StrOpt('hmac_keywrap_mechanism', cfg.StrOpt('hmac_mechanism',
help=u._('HMAC key wrap mechanism'), help=u._('HMAC algorithm used to sign encrypted data.'),
default='CKM_SHA256_HMAC'), default='CKM_SHA256_HMAC',
deprecated_name='hmac_keywrap_mechanism'),
cfg.StrOpt('key_wrap_mechanism',
help=u._('Key Wrapping algorithm used to wrap Project KEKs.'),
default='CKM_AES_CBC_PAD'),
cfg.BoolOpt('key_wrap_generate_iv',
help=u._('Generate IVs for Key Wrapping mechanism.'),
default=True),
cfg.StrOpt('seed_file', cfg.StrOpt('seed_file',
help=u._('File to pull entropy for seeding RNG'), help=u._('File to pull entropy for seeding RNG'),
default=''), default=''),
@ -136,28 +144,31 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
if plugin_conf.library_path is None: if plugin_conf.library_path is None:
raise ValueError(u._("library_path is required")) raise ValueError(u._("library_path is required"))
self.library_path = plugin_conf.library_path self.library_path = plugin_conf.library_path
self.login = plugin_conf.login
self.token_serial_number = plugin_conf.token_serial_number
self.token_labels = plugin_conf.token_labels
self.slot_id = plugin_conf.slot_id
self.rw_session = plugin_conf.rw_session
self.seed_file = plugin_conf.seed_file
self.seed_length = plugin_conf.seed_length
self.encryption_mechanism = plugin_conf.encryption_mechanism self.encryption_mechanism = plugin_conf.encryption_mechanism
self.generate_iv = plugin_conf.aes_gcm_generate_iv self.encryption_gen_iv = plugin_conf.aes_gcm_generate_iv
self.cka_sensitive = plugin_conf.always_set_cka_sensitive self.cka_sensitive = plugin_conf.always_set_cka_sensitive
self.mkek_key_type = 'CKK_AES' self.mkek_key_type = 'CKK_AES'
self.mkek_length = plugin_conf.mkek_length self.mkek_length = plugin_conf.mkek_length
self.mkek_label = plugin_conf.mkek_label self.mkek_label = plugin_conf.mkek_label
self.hmac_label = plugin_conf.hmac_label
self.hmac_key_type = plugin_conf.hmac_key_type self.hmac_key_type = plugin_conf.hmac_key_type
self.hmac_keygen_mechanism = plugin_conf.hmac_keygen_mechanism self.hmac_label = plugin_conf.hmac_label
self.hmac_keywrap_mechanism = plugin_conf.hmac_keywrap_mechanism self.hmac_mechanism = plugin_conf.hmac_mechanism
self.key_wrap_mechanism = plugin_conf.key_wrap_mechanism
self.key_wrap_gen_iv = plugin_conf.key_wrap_generate_iv
self.os_locking_ok = plugin_conf.os_locking_ok self.os_locking_ok = plugin_conf.os_locking_ok
self.pkek_length = plugin_conf.pkek_length self.pkek_length = plugin_conf.pkek_length
self.pkek_cache_ttl = plugin_conf.pkek_cache_ttl self.pkek_cache_ttl = plugin_conf.pkek_cache_ttl
self.pkek_cache_limit = plugin_conf.pkek_cache_limit self.pkek_cache_limit = plugin_conf.pkek_cache_limit
self.rw_session = plugin_conf.rw_session
self.seed_file = plugin_conf.seed_file
self.seed_length = plugin_conf.seed_length
self.slot_id = plugin_conf.slot_id
self.login = plugin_conf.login
self.token_serial_number = plugin_conf.token_serial_number
self.token_labels = plugin_conf.token_labels
# Use specified or create new pkcs11 object # Use specified or create new pkcs11 object
self.pkcs11 = pkcs11 or self._create_pkcs11(ffi) self.pkcs11 = pkcs11 or self._create_pkcs11(ffi)
@ -339,17 +350,19 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
return pkcs11.PKCS11( return pkcs11.PKCS11(
library_path=self.library_path, library_path=self.library_path,
login_passphrase=self.login, login_passphrase=self.login,
rw_session=self.rw_session,
slot_id=self.slot_id,
encryption_mechanism=self.encryption_mechanism,
ffi=ffi,
seed_random_buffer=seed_random_buffer,
generate_iv=self.generate_iv,
always_set_cka_sensitive=self.cka_sensitive,
hmac_keywrap_mechanism=self.hmac_keywrap_mechanism,
token_serial_number=self.token_serial_number, token_serial_number=self.token_serial_number,
token_labels=self.token_labels, token_labels=self.token_labels,
os_locking_ok=self.os_locking_ok slot_id=self.slot_id,
rw_session=self.rw_session,
seed_random_buffer=seed_random_buffer,
encryption_mechanism=self.encryption_mechanism,
encryption_gen_iv=self.encryption_gen_iv,
always_set_cka_sensitive=self.cka_sensitive,
hmac_mechanism=self.hmac_mechanism,
key_wrap_mechanism=self.key_wrap_mechanism,
key_wrap_gen_iv=self.key_wrap_gen_iv,
os_locking_ok=self.os_locking_ok,
ffi=ffi
) )
def _reinitialize_pkcs11(self): def _reinitialize_pkcs11(self):
@ -390,23 +403,33 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
return key return key
def _load_kek_from_meta_dto(self, kek_meta_dto): def _load_kek_from_meta_dto(self, kek_meta_dto):
# If plugin_meta is missing the keywrap_mechanism, we default
# to the previously hard-coded CKM_AES_CBC_PAD
_DEFAULT_KEYWRAP_MECHANISM = 'CKM_AES_CBC_PAD'
meta = json.loads(kek_meta_dto.plugin_meta) meta = json.loads(kek_meta_dto.plugin_meta)
keywrap_mechanism = meta.get('key_wrap_mechanism',
_DEFAULT_KEYWRAP_MECHANISM)
LOG.debug("Key Wrap mechanism: %s", keywrap_mechanism)
kek = self._load_kek( kek = self._load_kek(
kek_meta_dto.kek_label, meta['iv'], meta['wrapped_key'], kek_meta_dto.kek_label, meta['iv'], meta['wrapped_key'],
meta['hmac'], meta['mkek_label'], meta['hmac_label'] meta['hmac'], meta['mkek_label'], meta['hmac_label'],
keywrap_mechanism
) )
return kek return kek
def _load_kek(self, key_label, iv, wrapped_key, hmac, def _load_kek(self, key_label, iv, wrapped_key, hmac,
mkek_label, hmac_label): mkek_label, hmac_label, keywrap_mechanism):
with self.pkek_cache_lock: with self.pkek_cache_lock:
kek = self._pkek_cache_get(key_label) kek = self._pkek_cache_get(key_label)
if kek is None: if kek is None:
# Decode data # Decode data
iv = base64.b64decode(iv)
wrapped_key = base64.b64decode(wrapped_key) wrapped_key = base64.b64decode(wrapped_key)
hmac = base64.b64decode(hmac) if iv is None:
kek_data = wrapped_key
else:
iv = base64.b64decode(iv)
kek_data = iv + wrapped_key kek_data = iv + wrapped_key
hmac = base64.b64decode(hmac)
with self.caching_session_lock: with self.caching_session_lock:
session = self.caching_session session = self.caching_session
@ -418,7 +441,11 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
self.pkcs11.verify_hmac(mkhk, hmac, kek_data, session) self.pkcs11.verify_hmac(mkhk, hmac, kek_data, session)
# Unwrap KEK # Unwrap KEK
kek = self.pkcs11.unwrap_key(mkek, iv, wrapped_key, kek = self.pkcs11.unwrap_key(
keywrap_mechanism,
mkek,
iv,
wrapped_key,
session) session)
self._pkek_cache_add(kek, key_label) self._pkek_cache_add(kek, key_label)
@ -441,16 +468,21 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
wkek = self.pkcs11.wrap_key(mkek, kek, session) wkek = self.pkcs11.wrap_key(mkek, kek, session)
# HMAC Wrapped KEK # HMAC Wrapped KEK
if wkek['iv'] is None:
wkek_data = wkek['wrapped_key']
else:
wkek_data = wkek['iv'] + wkek['wrapped_key'] wkek_data = wkek['iv'] + wkek['wrapped_key']
wkek_hmac = self.pkcs11.compute_hmac(mkhk, wkek_data, session) wkek_hmac = self.pkcs11.compute_hmac(mkhk, wkek_data, session)
# Cache KEK # Cache KEK
self._pkek_cache_add(kek, key_label) self._pkek_cache_add(kek, key_label)
return { return {
'iv': base64.b64encode(wkek['iv']), 'iv': wkek['iv'] and base64.b64encode(wkek['iv']),
'wrapped_key': base64.b64encode(wkek['wrapped_key']), 'wrapped_key': base64.b64encode(wkek['wrapped_key']),
'hmac': base64.b64encode(wkek_hmac), 'hmac': base64.b64encode(wkek_hmac),
'mkek_label': self.mkek_label, 'mkek_label': self.mkek_label,
'hmac_label': self.hmac_label 'hmac_label': self.hmac_label,
'key_wrap_mechanism': wkek['key_wrap_mechanism']
} }

View File

@ -141,7 +141,8 @@ CKM_AES_CBC = 0x1082
CKM_AES_MAC = 0x1083 CKM_AES_MAC = 0x1083
CKM_AES_CBC_PAD = 0x1085 CKM_AES_CBC_PAD = 0x1085
CKM_AES_GCM = 0x1087 CKM_AES_GCM = 0x1087
CKM_AES_KEY_WRAP = 0x1090 CKM_AES_KEY_WRAP_PAD = 0x210A
CKM_AES_KEY_WRAP_KWP = 0x210B
CKM_GENERIC_SECRET_KEY_GEN = 0x350 CKM_GENERIC_SECRET_KEY_GEN = 0x350
VENDOR_SAFENET_CKM_AES_GCM = 0x8000011c VENDOR_SAFENET_CKM_AES_GCM = 0x8000011c
@ -157,20 +158,30 @@ _ENCRYPTION_MECHANISMS = {
_CBC_IV_SIZE = 16 # bytes _CBC_IV_SIZE = 16 # bytes
_CBC_BLOCK_SIZE = 128 # bits _CBC_BLOCK_SIZE = 128 # bits
# ----- Supported Mechanisms -----
# Barbican only supports the PKCS#11 mechanisms below
_KEY_GEN_MECHANISMS = { _KEY_GEN_MECHANISMS = {
'CKM_AES_KEY_GEN': CKM_AES_KEY_GEN, 'CKM_AES_KEY_GEN': CKM_AES_KEY_GEN,
'CKM_NC_SHA256_HMAC_KEY_GEN': CKM_NC_SHA256_HMAC_KEY_GEN, 'CKM_NC_SHA256_HMAC_KEY_GEN': CKM_NC_SHA256_HMAC_KEY_GEN,
'CKM_GENERIC_SECRET_KEY_GEN': CKM_GENERIC_SECRET_KEY_GEN, 'CKM_GENERIC_SECRET_KEY_GEN': CKM_GENERIC_SECRET_KEY_GEN,
} }
_KEY_WRAP_MECHANISMS = { _HMAC_MECHANISMS = {
'CKM_SHA256_HMAC': CKM_SHA256_HMAC, 'CKM_SHA256_HMAC': CKM_SHA256_HMAC,
'CKM_AES_MAC': CKM_AES_MAC 'CKM_AES_MAC': CKM_AES_MAC
} }
_KEY_WRAP_MECHANISMS = {
'CKM_AES_CBC_PAD': CKM_AES_CBC_PAD,
'CKM_AES_KEY_WRAP_PAD': CKM_AES_KEY_WRAP_PAD,
'CKM_AES_KEY_WRAP_KWP': CKM_AES_KEY_WRAP_KWP
}
CKM_NAMES = dict() CKM_NAMES = dict()
CKM_NAMES.update(_ENCRYPTION_MECHANISMS) CKM_NAMES.update(_ENCRYPTION_MECHANISMS)
CKM_NAMES.update(_KEY_GEN_MECHANISMS) CKM_NAMES.update(_KEY_GEN_MECHANISMS)
CKM_NAMES.update(_HMAC_MECHANISMS)
CKM_NAMES.update(_KEY_WRAP_MECHANISMS) CKM_NAMES.update(_KEY_WRAP_MECHANISMS)
ERROR_CODES = { ERROR_CODES = {
@ -324,6 +335,16 @@ def build_ffi():
CK_BYTE minor; CK_BYTE minor;
} CK_VERSION; } CK_VERSION;
typedef struct CK_INFO {
CK_VERSION cryptokiVersion;
CK_UTF8CHAR manufacturerID[32];
CK_FLAGS flags;
CK_UTF8CHAR libraryDescription[32];
CK_VERSION libraryVersion;
} CK_INFO;
typedef CK_INFO * CK_INFO_PTR;
typedef struct CK_SLOT_INFO { typedef struct CK_SLOT_INFO {
CK_UTF8CHAR slotDescription[64]; CK_UTF8CHAR slotDescription[64];
CK_UTF8CHAR manufacturerID[32]; CK_UTF8CHAR manufacturerID[32];
@ -384,6 +405,7 @@ def build_ffi():
CK_RV C_GetSessionInfo(CK_SESSION_HANDLE, CK_SESSION_INFO_PTR); CK_RV C_GetSessionInfo(CK_SESSION_HANDLE, CK_SESSION_INFO_PTR);
CK_RV C_Login(CK_SESSION_HANDLE, CK_USER_TYPE, CK_UTF8CHAR_PTR, CK_RV C_Login(CK_SESSION_HANDLE, CK_USER_TYPE, CK_UTF8CHAR_PTR,
CK_ULONG); CK_ULONG);
CK_RV C_GetInfo(CK_INFO_PTR);
CK_RV C_GetSlotList(CK_BBOOL, CK_SLOT_ID_PTR, CK_ULONG_PTR); CK_RV C_GetSlotList(CK_BBOOL, CK_SLOT_ID_PTR, CK_ULONG_PTR);
CK_RV C_GetSlotInfo(CK_SLOT_ID, CK_SLOT_INFO_PTR); CK_RV C_GetSlotInfo(CK_SLOT_ID, CK_SLOT_INFO_PTR);
CK_RV C_GetTokenInfo(CK_SLOT_ID, CK_TOKEN_INFO_PTR); CK_RV C_GetTokenInfo(CK_SLOT_ID, CK_TOKEN_INFO_PTR);
@ -426,41 +448,31 @@ def build_ffi():
class PKCS11(object): class PKCS11(object):
def __init__(self, library_path, login_passphrase, rw_session, slot_id, def __init__(self, library_path, login_passphrase,
encryption_mechanism=None,
ffi=None, algorithm=None,
seed_random_buffer=None,
generate_iv=None, always_set_cka_sensitive=None,
hmac_keywrap_mechanism='CKM_SHA256_HMAC',
token_serial_number=None, token_serial_number=None,
token_labels=None, token_labels=None,
os_locking_ok=False): slot_id=None,
if algorithm: rw_session=True,
LOG.warning("WARNING: Using deprecated 'algorithm' argument.") seed_random_buffer=None,
encryption_mechanism = encryption_mechanism or algorithm encryption_mechanism=None,
encryption_gen_iv=True,
always_set_cka_sensitive=True,
hmac_mechanism=None,
key_wrap_mechanism=None,
key_wrap_gen_iv=False,
os_locking_ok=False,
ffi=None):
# Validate all mechanisms are supported
if encryption_mechanism not in _ENCRYPTION_MECHANISMS: if encryption_mechanism not in _ENCRYPTION_MECHANISMS:
raise ValueError("Invalid encryption_mechanism.") raise ValueError("Invalid Encryption mechanism.")
self.encrypt_mech = _ENCRYPTION_MECHANISMS[encryption_mechanism] if hmac_mechanism not in _HMAC_MECHANISMS:
self.encrypt = getattr( raise ValueError("Invalid HMAC signing mechanism.")
self, if key_wrap_mechanism not in _KEY_WRAP_MECHANISMS:
'_{}_encrypt'.format(encryption_mechanism) raise ValueError("Invalid Key Wrapping mechanism.")
)
if hmac_keywrap_mechanism not in _KEY_WRAP_MECHANISMS:
raise ValueError("Invalid HMAC keywrap mechanism")
self.ffi = ffi or build_ffi() self.ffi = ffi or build_ffi()
self.lib = self.ffi.dlopen(library_path) self.lib = self.ffi.dlopen(library_path)
self._initialize_library(os_locking_ok)
if os_locking_ok:
init_arg_pt = self.ffi.new("CK_C_INITIALIZE_ARGS *")
init_arg_pt.flags = CKF_OS_LOCKING_OK
else:
init_arg_pt = self.ffi.NULL
rv = self.lib.C_Initialize(init_arg_pt)
self._check_error(rv)
# Session options # Session options
self.login_passphrase = _to_bytes(login_passphrase) self.login_passphrase = _to_bytes(login_passphrase)
@ -471,13 +483,19 @@ class PKCS11(object):
slot_id) slot_id)
# Algorithm options # Algorithm options
self.algorithm = CKM_NAMES[encryption_mechanism] self.encrypt_mech = CKM_NAMES[encryption_mechanism]
self.encrypt = getattr(
self,
'_{}_encrypt'.format(encryption_mechanism)
)
self.encrypt_gen_iv = encryption_gen_iv
self.blocksize = 16 self.blocksize = 16
self.noncesize = 12 self.noncesize = 12
self.gcmtagsize = 16 self.gcmtagsize = 16
self.generate_iv = generate_iv
self.always_set_cka_sensitive = always_set_cka_sensitive self.always_set_cka_sensitive = always_set_cka_sensitive
self.hmac_keywrap_mechanism = CKM_NAMES[hmac_keywrap_mechanism] self.hmac_mechanism = CKM_NAMES[hmac_mechanism]
self.key_wrap_mechanism = key_wrap_mechanism
self.key_wrap_gen_iv = key_wrap_gen_iv
# Validate configuration and RNG # Validate configuration and RNG
session = self.get_session() session = self.get_session()
@ -487,6 +505,16 @@ class PKCS11(object):
self.return_session(session) self.return_session(session)
LOG.debug("Connected to PCKS#11 Token in Slot %s", self.slot_id) LOG.debug("Connected to PCKS#11 Token in Slot %s", self.slot_id)
def _initialize_library(self, os_locking_ok):
if os_locking_ok:
init_arg_pt = self.ffi.new("CK_C_INITIALIZE_ARGS *")
init_arg_pt.flags = CKF_OS_LOCKING_OK
else:
init_arg_pt = self.ffi.NULL
rv = self.lib.C_Initialize(init_arg_pt)
self._check_error(rv)
def _get_slot_id(self, token_serial_number, token_labels, slot_id): def _get_slot_id(self, token_serial_number, token_labels, slot_id):
# First find out how many slots with tokens are available # First find out how many slots with tokens are available
slots_ptr = self.ffi.new("CK_ULONG_PTR") slots_ptr = self.ffi.new("CK_ULONG_PTR")
@ -640,14 +668,14 @@ class PKCS11(object):
def _VENDOR_SAFENET_CKM_AES_GCM_encrypt(self, key, pt_data, session): def _VENDOR_SAFENET_CKM_AES_GCM_encrypt(self, key, pt_data, session):
iv = None iv = None
if self.generate_iv: if self.encrypt_gen_iv:
iv = self._generate_random(self.noncesize, session) iv = self._generate_random(self.noncesize, session)
ck_mechanism = self._build_gcm_mechanism(iv) ck_mechanism = self._build_gcm_mechanism(iv)
rv = self.lib.C_EncryptInit(session, ck_mechanism.mech, key) rv = self.lib.C_EncryptInit(session, ck_mechanism.mech, key)
self._check_error(rv) self._check_error(rv)
pt_len = len(pt_data) pt_len = len(pt_data)
if self.generate_iv: if self.encrypt_gen_iv:
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize) ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize)
else: else:
ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize * 2) ct_len = self.ffi.new("CK_ULONG *", pt_len + self.gcmtagsize * 2)
@ -655,7 +683,7 @@ class PKCS11(object):
rv = self.lib.C_Encrypt(session, pt_data, pt_len, ct, ct_len) rv = self.lib.C_Encrypt(session, pt_data, pt_len, ct, ct_len)
self._check_error(rv) self._check_error(rv)
if self.generate_iv: if self.encrypt_gen_iv:
return { return {
"iv": self.ffi.buffer(iv)[:], "iv": self.ffi.buffer(iv)[:],
"ct": self.ffi.buffer(ct, ct_len[0])[:] "ct": self.ffi.buffer(ct, ct_len[0])[:]
@ -669,7 +697,7 @@ class PKCS11(object):
def _build_gcm_mechanism(self, iv=None): def _build_gcm_mechanism(self, iv=None):
mech = self.ffi.new("CK_MECHANISM *") mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.algorithm mech.mechanism = self.encrypt_mech
gcm = self.ffi.new("CK_AES_GCM_PARAMS *") gcm = self.ffi.new("CK_AES_GCM_PARAMS *")
if iv: if iv:
@ -774,11 +802,19 @@ class PKCS11(object):
return obj_handle_ptr[0] return obj_handle_ptr[0]
def wrap_key(self, wrapping_key, key_to_wrap, session): def wrap_key(self, wrapping_key, key_to_wrap, session):
mech = self.ffi.new("CK_MECHANISM *") if self.key_wrap_gen_iv:
mech.mechanism = CKM_AES_CBC_PAD iv_len = {
iv = self._generate_random(16, session) 'CKM_AES_CBC_PAD': 16, # bytes
mech.parameter = iv 'CKM_AES_WRAP_PAD': 8, # bytes
mech.parameter_len = 16 'CKM_AES_KEY_WRAP_KWP': 4 # bytes
}.get(self.key_wrap_mechanism)
iv = self._generate_random(iv_len, session)
else:
iv = None
mech = self._build_key_wrap_mechanism(
CKM_NAMES[self.key_wrap_mechanism],
iv
)
# Ask for length of the wrapped key # Ask for length of the wrapped key
wrapped_key_len = self.ffi.new("CK_ULONG *") wrapped_key_len = self.ffi.new("CK_ULONG *")
@ -797,18 +833,27 @@ class PKCS11(object):
self._check_error(rv) self._check_error(rv)
return { return {
'iv': self.ffi.buffer(iv)[:], 'iv': iv and self.ffi.buffer(iv)[:],
'wrapped_key': self.ffi.buffer(wrapped_key, wrapped_key_len[0])[:] 'wrapped_key': self.ffi.buffer(wrapped_key, wrapped_key_len[0])[:],
'key_wrap_mechanism': self.key_wrap_mechanism
} }
def unwrap_key(self, wrapping_key, iv, wrapped_key, session): def _build_key_wrap_mechanism(self, mechanism, iv):
ck_iv = self.ffi.new("CK_BYTE[]", iv)
ck_wrapped_key = self.ffi.new("CK_BYTE[]", wrapped_key)
unwrapped_key = self.ffi.new("CK_OBJECT_HANDLE *")
mech = self.ffi.new("CK_MECHANISM *") mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = CKM_AES_CBC_PAD mech.mechanism = mechanism
mech.parameter = ck_iv if iv is not None:
mech.parameter = iv
mech.parameter_len = len(iv) mech.parameter_len = len(iv)
return mech
def unwrap_key(self, mechanism, wrapping_key, iv, wrapped_key, session):
ck_wrapped_key = self.ffi.new("CK_BYTE[]", wrapped_key)
ck_iv = iv and self.ffi.new("CK_BYTE[{}]".format(len(iv)), iv)
unwrapped_key = self.ffi.new("CK_OBJECT_HANDLE *")
mech = self._build_key_wrap_mechanism(
CKM_NAMES[mechanism],
ck_iv
)
ck_attributes = self._build_attributes([ ck_attributes = self._build_attributes([
Attribute(CKA_CLASS, CKO_SECRET_KEY), Attribute(CKA_CLASS, CKO_SECRET_KEY),
@ -830,7 +875,7 @@ class PKCS11(object):
def compute_hmac(self, hmac_key, data, session): def compute_hmac(self, hmac_key, data, session):
mech = self.ffi.new("CK_MECHANISM *") mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.hmac_keywrap_mechanism mech.mechanism = self.hmac_mechanism
rv = self.lib.C_SignInit(session, mech, hmac_key) rv = self.lib.C_SignInit(session, mech, hmac_key)
self._check_error(rv) self._check_error(rv)
@ -843,7 +888,7 @@ class PKCS11(object):
def verify_hmac(self, hmac_key, sig, data, session): def verify_hmac(self, hmac_key, sig, data, session):
mech = self.ffi.new("CK_MECHANISM *") mech = self.ffi.new("CK_MECHANISM *")
mech.mechanism = self.hmac_keywrap_mechanism mech.mechanism = self.hmac_mechanism
rv = self.lib.C_VerifyInit(session, mech, hmac_key) rv = self.lib.C_VerifyInit(session, mech, hmac_key)
self._check_error(rv) self._check_error(rv)

View File

@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import base64
import builtins import builtins
import collections
import os
from unittest import mock from unittest import mock
from barbican.common import exception as ex from barbican.common import exception as ex
@ -24,6 +27,11 @@ from barbican.plugin.crypto import pkcs11
from barbican.tests import utils from barbican.tests import utils
FakeKEKMetaDTO = collections.namedtuple(
'FakeKEKMetaDTO', 'kek_label, plugin_meta'
)
def generate_random_effect(length, session): def generate_random_effect(length, session):
return b'0' * length return b'0' * length
@ -41,7 +49,10 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
self.pkcs11.encrypt.return_value = {'iv': b'0', 'ct': b'0'} self.pkcs11.encrypt.return_value = {'iv': b'0', 'ct': b'0'}
self.pkcs11.decrypt.return_value = b'0' self.pkcs11.decrypt.return_value = b'0'
self.pkcs11.generate_key.return_value = int(3) self.pkcs11.generate_key.return_value = int(3)
self.pkcs11.wrap_key.return_value = {'iv': b'1', 'wrapped_key': b'1'} self.pkcs11.wrap_key.return_value = {
'iv': b'1',
'wrapped_key': b'1',
'key_wrap_mechanism': 'CKM_AES_CBC_PAD'}
self.pkcs11.unwrap_key.return_value = int(4) self.pkcs11.unwrap_key.return_value = int(4)
self.pkcs11.compute_hmac.return_value = b'1' self.pkcs11.compute_hmac.return_value = b'1'
self.pkcs11.verify_hmac.return_value = None self.pkcs11.verify_hmac.return_value = None
@ -63,8 +74,11 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
self.cfg_mock.p11_crypto_plugin.encryption_mechanism = 'CKM_AES_CBC' self.cfg_mock.p11_crypto_plugin.encryption_mechanism = 'CKM_AES_CBC'
self.cfg_mock.p11_crypto_plugin.seed_file = '' self.cfg_mock.p11_crypto_plugin.seed_file = ''
self.cfg_mock.p11_crypto_plugin.seed_length = 32 self.cfg_mock.p11_crypto_plugin.seed_length = 32
self.cfg_mock.p11_crypto_plugin.hmac_keywrap_mechanism = \ self.cfg_mock.p11_crypto_plugin.hmac_mechanism = \
'CKM_SHA256_HMAC' 'CKM_SHA256_HMAC'
self.cfg_mock.p11_crypto_plugin.key_wrap_mechanism = \
'CKM_AES_CBC_PAD'
self.cfg_mock.p11_crypto_plugin.key_wrap_gen_iv = True
self.plugin_name = 'Test PKCS11 plugin' self.plugin_name = 'Test PKCS11 plugin'
self.cfg_mock.p11_crypto_plugin.plugin_name = self.plugin_name self.cfg_mock.p11_crypto_plugin.plugin_name = self.plugin_name
@ -157,7 +171,9 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
def test_decrypt(self): def test_decrypt(self):
ct = b'ctct' ct = b'ctct'
kek_meta_extended = '{"iv":"AAAA","mechanism":"CKM_AES_CBC"}' kek_meta_extended = ('{"iv":"AAAA",'
'"mechanism":"CKM_AES_CBC",'
'"key_wrap_mechanism":"CKM_AES_CBC_PAD"}')
decrypt_dto = plugin_import.DecryptDTO(ct) decrypt_dto = plugin_import.DecryptDTO(ct)
kek_meta = mock.MagicMock() kek_meta = mock.MagicMock()
kek_meta.kek_label = 'pkek' kek_meta.kek_label = 'pkek'
@ -355,3 +371,23 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
def test_get_plugin_name(self): def test_get_plugin_name(self):
self.assertEqual(self.plugin_name, self.plugin.get_plugin_name()) self.assertEqual(self.plugin_name, self.plugin.get_plugin_name())
def test_load_kek_from_meta_dto_no_key_wrap_mechanism(self):
key = base64.b64encode(os.urandom(32)).decode('UTF-8')
hmac = base64.b64encode(os.urandom(16)).decode('UTF-8')
fake_dto = FakeKEKMetaDTO('test_kek', p11_crypto.json_dumps_compact({
'iv': None,
'wrapped_key': key,
'hmac': hmac,
'mkek_label': 'test_mkek',
'hmac_label': 'test_hmac'
}))
load_mock = mock.MagicMock()
self.plugin._load_kek = load_mock
self.plugin._load_kek_from_meta_dto(fake_dto)
# key_wrap_mechanism should default to 'CKM_AES_CBC_PAD'
load_mock.assert_called_with(
'test_kek', None, key, hmac,
'test_mkek', 'test_hmac', 'CKM_AES_CBC_PAD')

View File

@ -59,18 +59,22 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.cfg_mock.rw_session = False self.cfg_mock.rw_session = False
self.cfg_mock.slot_id = 1 self.cfg_mock.slot_id = 1
self.cfg_mock.encryption_mechanism = 'CKM_AES_CBC' self.cfg_mock.encryption_mechanism = 'CKM_AES_CBC'
self.cfg_mock.hmac_keywrap_mechanism = 'CKM_SHA256_HMAC' self.cfg_mock.hmac_mechanism = 'CKM_SHA256_HMAC'
self.cfg_mock.key_wrap_mechanism = 'CKM_AES_KEY_WRAP_KWP'
self.token_mock = mock.MagicMock() self.token_mock = mock.MagicMock()
self.token_mock.label = b'myLabel' self.token_mock.label = b'myLabel'
self.token_mock.serial_number = b'111111' self.token_mock.serial_number = b'111111'
self.pkcs11 = pkcs11.PKCS11( self.pkcs11 = pkcs11.PKCS11(
self.cfg_mock.library_path, self.cfg_mock.login_passphrase, self.cfg_mock.library_path,
self.cfg_mock.rw_session, self.cfg_mock.slot_id, self.cfg_mock.login_passphrase,
self.cfg_mock.encryption_mechanism, slot_id=self.cfg_mock.slot_id,
rw_session=self.cfg_mock.rw_session,
encryption_mechanism=self.cfg_mock.encryption_mechanism,
hmac_mechanism=self.cfg_mock.hmac_mechanism,
key_wrap_mechanism=self.cfg_mock.key_wrap_mechanism,
ffi=self.ffi, ffi=self.ffi,
hmac_keywrap_mechanism=self.cfg_mock.hmac_keywrap_mechanism
) )
def _generate_random(self, session, buf, length): def _generate_random(self, session, buf, length):
@ -140,7 +144,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
return pkcs11.CKR_OK return pkcs11.CKR_OK
def _encrypt(self, session, pt, pt_len, ct, ct_len): def _encrypt(self, session, pt, pt_len, ct, ct_len):
if self.pkcs11.generate_iv: if self.pkcs11.encrypt_gen_iv:
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * self.pkcs11.gcmtagsize self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * self.pkcs11.gcmtagsize
else: else:
self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * (self.pkcs11.gcmtagsize self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * (self.pkcs11.gcmtagsize
@ -313,7 +317,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def test_encrypt_with_no_iv_generation(self): def test_encrypt_with_no_iv_generation(self):
pt = b'0123456789ABCDEF' pt = b'0123456789ABCDEF'
self.pkcs11.generate_iv = False self.pkcs11.encrypt_gen_iv = False
ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt( ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt(
mock.MagicMock(), mock.MagicMock(),
pt, mock.MagicMock() pt, mock.MagicMock()
@ -328,7 +332,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
def test_encrypt_with_iv_generation(self): def test_encrypt_with_iv_generation(self):
pt = b'0123456789ABCDEF' pt = b'0123456789ABCDEF'
self.pkcs11.generate_iv = True self.pkcs11.encrypt_gen_iv = True
ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt( ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt(
mock.MagicMock(), pt, mock.MagicMock() mock.MagicMock(), pt, mock.MagicMock()
) )
@ -412,7 +416,8 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(1, self.lib.C_DecryptInit.call_count) self.assertEqual(1, self.lib.C_DecryptInit.call_count)
self.assertEqual(1, self.lib.C_Decrypt.call_count) self.assertEqual(1, self.lib.C_Decrypt.call_count)
def test_wrap_key(self): def test_wrap_key_with_iv_generation(self):
self.pkcs11.key_wrap_gen_iv = True
wkek = self.pkcs11.wrap_key(mock.Mock(), mock.Mock(), mock.Mock()) wkek = self.pkcs11.wrap_key(mock.Mock(), mock.Mock(), mock.Mock())
self.assertGreater(len(wkek['iv']), 0) self.assertGreater(len(wkek['iv']), 0)
self.assertEqual(b'0' * 16, wkek['wrapped_key']) self.assertEqual(b'0' * 16, wkek['wrapped_key'])
@ -420,9 +425,22 @@ class WhenTestingPKCS11(utils.BaseTestCase):
self.assertEqual(2, self.lib.C_GenerateRandom.call_count) self.assertEqual(2, self.lib.C_GenerateRandom.call_count)
self.assertEqual(2, self.lib.C_WrapKey.call_count) self.assertEqual(2, self.lib.C_WrapKey.call_count)
def test_wrap_key_no_iv_generation(self):
self.pkcs11.key_wrap_gen_iv = False
wkek = self.pkcs11.wrap_key(mock.Mock(), mock.Mock(), mock.Mock())
self.assertIsNone(wkek['iv'])
self.assertEqual(b'0' * 16, wkek['wrapped_key'])
self.assertEqual(1, self.lib.C_GenerateRandom.call_count)
self.assertEqual(2, self.lib.C_WrapKey.call_count)
def test_unwrap_key(self): def test_unwrap_key(self):
kek = self.pkcs11.unwrap_key(mock.Mock(), b'0' * 16, kek = self.pkcs11.unwrap_key(
b'0' * 16, mock.Mock()) 'CKM_AES_CBC_PAD',
b'0' * 16,
b'0' * 16,
b'0' * 16,
mock.Mock())
self.assertEqual(1, kek) self.assertEqual(1, kek)
self.assertEqual(self.lib.C_UnwrapKey.call_count, 1) self.assertEqual(self.lib.C_UnwrapKey.call_count, 1)

View File

@ -91,70 +91,65 @@ and signed with HMAC key. Both MKEK and HMAC resides in the HSM.
The configuration for this plugin in ``/etc/barbican/barbican.conf``. The configuration for this plugin in ``/etc/barbican/barbican.conf``.
Settings for some different HSMs are provided below: Settings for some different HSMs are provided below:
Thales Luna Network HSM (Safenet) Thales Luna Network HSM
+++++++++++++++++++++++++++++++++ +++++++++++++++++++++++
The PKCS#11 plugin configuration for Luna Network HSM looks like: The PKCS#11 plugin configuration for Luna Network HSM looks like:
.. code-block:: ini .. code-block:: ini
# ================= Secret Store Plugin ===================
[secretstore] [secretstore]
.. enable_multiple_secret_stores = True
enabled_secretstore_plugins = store_crypto stores_lookup_suffix = luna
# ========== Secret Store configuration ==========
[secretstore:luna]
secret_store_plugin = store_crypto
crypto_plugin = p11_crypto
# ================= Crypto plugin =================== # ================= Crypto plugin ===================
[crypto]
..
enabled_crypto_plugins = p11_crypto
[p11_crypto_plugin] [p11_crypto_plugin]
# Path to vendor PKCS11 library # Path to vendor PKCS11 library
library_path = '/usr/lib/libCryptoki2_64.so' library_path = '/usr/lib/libCryptoki2_64.so'
# Token serial number used to identify the token to be used. Required # Token serial number for the token to be used. Required
# when the device has multiple tokens with the same label. (string # when the device has multiple tokens with the same label.
# value) # (string value)
#token_serial_number = 12345678 #token_serial_number = 12345678
# Token label used to identify the token to be used. Required when # Token label for the token to be used. Required when
# token_serial_number is not specified. (string value) # token_serial_number is not specified. (string value)
#token_label = <None> token_labels = myPCKS11Token
# Password to login to PKCS11 session # (Optional) HSM Slot ID that contains the token device to be used.
# Required when token_serial_number and token_labels are not specified.
# (integer value)
#slot_id = 0
# Password (PIN) to login to PKCS11 session
login = 'mypassword' login = 'mypassword'
# Encryption algorithm used to encrypt secrets
encryption_mechanism = CKM_AES_CBC_GCM
# Label to identify master KEK in the HSM (must not be the same as HMAC label) # Label to identify master KEK in the HSM (must not be the same as HMAC label)
mkek_label = 'my_mkek_label' mkek_label = 'my_mkek_label'
# Length in bytes of master KEK # Label to identify master HMAC key in the HSM (must not be the same as MKEK label)
mkek_length = 32
# Label to identify HMAC key in the HSM (must not be the same as MKEK label)
hmac_label = 'my_hmac_label' hmac_label = 'my_hmac_label'
# (Optional) HSM Slot ID that contains the token device to be used. # Key Type for the master HMAC key
# (integer value) hmac_key_type = CKK_GENERIC_SECRET
slot_id = 1
# HMAC Key Generation Algorithm used to create the master HMAC Key
hmac_keygen_mechanism = CKM_GENERIC_SECRET_KEY_GEN
# Enable Read/Write session with the HSM? # HMAC algorith used to sign ecnrypted data
# rw_session = True hmac_mechanism = CKM_SHA256_HMAC
# Length of Project KEKs to create # Key Wrap algorithm used to wrap Project KEKs
# pkek_length = 32 key_wrap_mechanism = CKM_AES_KEY_WRAP_KWP
# How long to cache unwrapped Project KEKs
# pkek_cache_ttl = 900
# Max number of items in pkek cache
# pkek_cache_limit = 100
.. note::
Barbican does not support FIPS mode enabled for SafeNet Luna HSM or
Data Protection on Demand HSM. Make sure that it's operating in non-FIPS
mode while integrating with Barbican.
The HMAC and MKEK keys can be generated as follows: The HMAC and MKEK keys can be generated as follows:

View File

@ -0,0 +1,22 @@
---
deprecations:
- |
The `[p11_crypto_plugin]hmac_keywrap_mechanism` option has been replaced
by `[p11_crypto_plugin]hmac_mechanism`. This option was renamed to avoid
confusion since this mechanism is only used to sign encrypted data and
never used for key wrap encryption.
security:
- |
The PKCS#11 backend driver has been updated to support newer Key Wrap
mechanisms. New deployments should use CKM_AES_KEY_WRAP_KWP, but
CKM_AES_KEY_WRAP_PAD and CKM_AES_CBC_PAD are also supported for
compatibility with older devices that have not yet implemented PKCS#11
Version 3.0.
fixes:
- |
Fixed Bug #2036506 - This patch replaces the hard-coded CKM_AES_CBC_PAD
mechanism used to wrap pKEKs with an option to configure this mechanism.
Two new options have been added to the [p11_crypto_plugin] section of the
configuration file: `key_wrap_mechanism` and `key_wrap_generate_iv`. These
options default to `CKM_AES_CBC_PAD` and `True` respectively to preserve
backwards compatibility.