PKCS11 refactor to use a master KEK and per project KEK

The plugin now supports rotating master KEK and verifies integrity of
project KEKs via HMAC (GCM is unavailable as a mechanism for Wrap/Unwrap
operations). The master KEK and HMAC key are generated as separate keys
in the HSM.

This key wrapping approach is taken because most HSMs do not have the
ability to use KDFs such as HKDF while keeping the derived key material
within the HSM.

Implements: blueprint restructure-pkcs11-plugin
Change-Id: Ic777ba0484cdbe71d6ee00fa33f8b4c9fc430e00
This commit is contained in:
Paul Kehrer 2014-09-05 22:12:36 -05:00
parent cb9065c309
commit e9541942a2
3 changed files with 293 additions and 91 deletions

View File

@ -21,12 +21,14 @@ import base64
from oslo.config import cfg from oslo.config import cfg
from barbican.common import exception from barbican.common import exception
from barbican.common import utils
from barbican.openstack.common import gettextutils as u from barbican.openstack.common import gettextutils as u
from barbican.openstack.common import jsonutils as json from barbican.openstack.common import jsonutils as json
from barbican.plugin.crypto import crypto as plugin from barbican.plugin.crypto import crypto as plugin
CONF = cfg.CONF CONF = cfg.CONF
LOG = utils.getLogger(__name__)
p11_crypto_plugin_group = cfg.OptGroup(name='p11_crypto_plugin', p11_crypto_plugin_group = cfg.OptGroup(name='p11_crypto_plugin',
title="PKCS11 Crypto Plugin Options") title="PKCS11 Crypto Plugin Options")
@ -34,7 +36,13 @@ p11_crypto_plugin_opts = [
cfg.StrOpt('library_path', cfg.StrOpt('library_path',
help=u._('Path to vendor PKCS11 library')), help=u._('Path to vendor PKCS11 library')),
cfg.StrOpt('login', cfg.StrOpt('login',
help=u._('Password to login to PKCS11 session')) help=u._('Password to login to PKCS11 session')),
cfg.StrOpt('mkek_label',
help=u._('Master KEK label (used in the HSM)')),
cfg.IntOpt('mkek_length',
help=u._('Master KEK length in bytes.')),
cfg.StrOpt('hmac_label',
help=u._('HMAC label (used in the HSM)')),
] ]
CONF.register_group(p11_crypto_plugin_group) CONF.register_group(p11_crypto_plugin_group)
CONF.register_opts(p11_crypto_plugin_opts, group=p11_crypto_plugin_group) CONF.register_opts(p11_crypto_plugin_opts, group=p11_crypto_plugin_group)
@ -51,13 +59,16 @@ class P11CryptoPluginException(exception.BarbicanException):
class P11CryptoPlugin(plugin.CryptoPluginBase): class P11CryptoPlugin(plugin.CryptoPluginBase):
"""PKCS11 supporting implementation of the crypto plugin. """PKCS11 supporting implementation of the crypto plugin.
Generates a key per tenant and encrypts using AES-256-GCM. Generates a single master key and a single HMAC key that remain in the
HSM, then generates a key per tenant in the HSM, wraps the key, computes
an HMAC, and stores it in the DB. The tenant key is never unencrypted
outside the HSM.
This implementation currently relies on an unreleased fork of PyKCS11. This implementation currently relies on an unreleased fork of PyKCS11.
""" """
def __init__(self, conf=cfg.CONF): def __init__(self, conf=cfg.CONF):
self.block_size = 16 # in bytes self.block_size = 16 # in bytes
self.kek_key_length = 32 # in bytes (256-bit)
self.algorithm = 0x8000011c # CKM_AES_GCM vendor prefixed. self.algorithm = 0x8000011c # CKM_AES_GCM vendor prefixed.
self.pkcs11 = PyKCS11.PyKCS11Lib() self.pkcs11 = PyKCS11.PyKCS11Lib()
if conf.p11_crypto_plugin.library_path is None: if conf.p11_crypto_plugin.library_path is None:
@ -70,16 +81,76 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
self.session.login(conf.p11_crypto_plugin.login) self.session.login(conf.p11_crypto_plugin.login)
self.rw_session = self.pkcs11.openSession(1, PyKCS11.CKF_RW_SESSION) self.rw_session = self.pkcs11.openSession(1, PyKCS11.CKF_RW_SESSION)
self.rw_session.login(conf.p11_crypto_plugin.login) self.rw_session.login(conf.p11_crypto_plugin.login)
self.current_mkek_label = conf.p11_crypto_plugin.mkek_label
self.current_hmac_label = conf.p11_crypto_plugin.hmac_label
LOG.debug("Current mkek label: %s", self.current_mkek_label)
LOG.debug("Current hmac label: %s", self.current_hmac_label)
self.key_handles = {}
# cache current MKEK handle in the dictionary
self._get_or_generate_mkek(
self.current_mkek_label,
conf.p11_crypto_plugin.mkek_length
)
self._get_or_generate_hmac_key(self.current_hmac_label)
def _check_error(self, value): def _check_error(self, value):
if value != PyKCS11.CKR_OK: if value != PyKCS11.CKR_OK:
raise PyKCS11.PyKCS11Error(value) raise PyKCS11.PyKCS11Error(value)
def _get_key_by_label(self, key_label): def _get_or_generate_mkek(self, mkek_label, mkek_key_length):
mkek = self._get_key_handle(mkek_label)
if not mkek:
# Generate a key that is persistent and not extractable
template = (
(PyKCS11.CKA_CLASS, PyKCS11.CKO_SECRET_KEY),
(PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_AES),
(PyKCS11.CKA_VALUE_LEN, mkek_key_length),
(PyKCS11.CKA_LABEL, mkek_label),
(PyKCS11.CKA_PRIVATE, True),
(PyKCS11.CKA_SENSITIVE, True),
(PyKCS11.CKA_ENCRYPT, True),
(PyKCS11.CKA_DECRYPT, True),
(PyKCS11.CKA_SIGN, True),
(PyKCS11.CKA_VERIFY, True),
(PyKCS11.CKA_TOKEN, True),
(PyKCS11.CKA_WRAP, True),
(PyKCS11.CKA_UNWRAP, True),
(PyKCS11.CKA_EXTRACTABLE, False))
mkek = self._generate_kek(template)
self.key_handles[mkek_label] = mkek
return mkek
def _get_or_generate_hmac_key(self, hmac_label):
hmac_key = self._get_key_handle(hmac_label)
if not hmac_key:
# Generate a key that is persistent and not extractable
template = (
(PyKCS11.CKA_CLASS, PyKCS11.CKO_SECRET_KEY),
(PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_AES),
(PyKCS11.CKA_VALUE_LEN, 32),
(PyKCS11.CKA_LABEL, hmac_label),
(PyKCS11.CKA_PRIVATE, True),
(PyKCS11.CKA_SENSITIVE, True),
(PyKCS11.CKA_SIGN, True),
(PyKCS11.CKA_VERIFY, True),
(PyKCS11.CKA_TOKEN, True),
(PyKCS11.CKA_EXTRACTABLE, False))
hmac_key = self._generate_kek(template)
self.key_handles[hmac_label] = hmac_key
return hmac_key
def _get_key_handle(self, mkek_label):
if mkek_label in self.key_handles:
return self.key_handles[mkek_label]
template = ( template = (
(PyKCS11.CKA_CLASS, PyKCS11.CKO_SECRET_KEY), (PyKCS11.CKA_CLASS, PyKCS11.CKO_SECRET_KEY),
(PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_AES), (PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_AES),
(PyKCS11.CKA_LABEL, key_label)) (PyKCS11.CKA_LABEL, mkek_label))
keys = self.session.findObjects(template) keys = self.session.findObjects(template)
if len(keys) == 1: if len(keys) == 1:
return keys[0] return keys[0]
@ -103,21 +174,11 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
gcm.ulTagBits = 128 gcm.ulTagBits = 128
return gcm return gcm
def _generate_kek(self, kek_label): def _generate_kek(self, template):
# TODO(reaperhulk): review template to ensure it's what we want """Generates both master and project KEKs
template = (
(PyKCS11.CKA_CLASS, PyKCS11.CKO_SECRET_KEY), :param template: A tuple of tuples in (CKA_TYPE, VALUE) form
(PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_AES), """
(PyKCS11.CKA_VALUE_LEN, self.kek_key_length),
(PyKCS11.CKA_LABEL, kek_label),
(PyKCS11.CKA_PRIVATE, True),
(PyKCS11.CKA_SENSITIVE, True),
(PyKCS11.CKA_ENCRYPT, True),
(PyKCS11.CKA_DECRYPT, True),
(PyKCS11.CKA_TOKEN, True),
(PyKCS11.CKA_WRAP, True),
(PyKCS11.CKA_UNWRAP, True),
(PyKCS11.CKA_EXTRACTABLE, False))
ckattr = self.session._template2ckattrlist(template) ckattr = self.session._template2ckattrlist(template)
m = PyKCS11.LowLevel.CK_MECHANISM() m = PyKCS11.LowLevel.CK_MECHANISM()
@ -132,9 +193,145 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
key key
) )
) )
return key
def _generate_wrapped_kek(self, kek_label, key_length):
# generate a non-persistent key that is extractable
template = (
(PyKCS11.CKA_CLASS, PyKCS11.CKO_SECRET_KEY),
(PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_AES),
(PyKCS11.CKA_VALUE_LEN, key_length),
(PyKCS11.CKA_LABEL, kek_label),
(PyKCS11.CKA_PRIVATE, True),
(PyKCS11.CKA_SENSITIVE, True),
(PyKCS11.CKA_ENCRYPT, True),
(PyKCS11.CKA_DECRYPT, True),
(PyKCS11.CKA_TOKEN, False), # not persistent
(PyKCS11.CKA_WRAP, True),
(PyKCS11.CKA_UNWRAP, True),
(PyKCS11.CKA_EXTRACTABLE, True)) # extractable
kek = self._generate_kek(template)
m = PyKCS11.LowLevel.CK_MECHANISM()
m.mechanism = PyKCS11.LowLevel.CKM_AES_CBC_PAD
iv = self._generate_iv()
m.pParameter = iv
encrypted = PyKCS11.ckbytelist()
mkek = self.key_handles[self.current_mkek_label]
# first call reserves the bytes required in the ckbytelist
self._check_error(
self.pkcs11.lib.C_WrapKey(
self.rw_session.session, m, mkek, kek, encrypted
)
)
# second call wraps and stores to encrypted
self._check_error(
self.pkcs11.lib.C_WrapKey(
self.rw_session.session, m, mkek, kek, encrypted
)
)
wrapped_key = b''.join(chr(i) for i in encrypted)
hmac = self._compute_hmac(encrypted)
return {
'iv': base64.b64encode(iv),
'wrapped_key': base64.b64encode(wrapped_key),
'hmac': base64.b64encode(hmac),
'mkek_label': self.current_mkek_label,
'hmac_label': self.current_hmac_label
}
def _compute_hmac(self, wrapped_bytelist):
m = PyKCS11.LowLevel.CK_MECHANISM()
m.mechanism = PyKCS11.LowLevel.CKM_SHA256_HMAC
hmac_bytelist = PyKCS11.ckbytelist()
hmac_key = self.key_handles[self.current_hmac_label]
self._check_error(
self.pkcs11.lib.C_SignInit(self.rw_session.session, m, hmac_key)
)
# first call reserves the bytes required in the ckbytelist
self._check_error(
self.pkcs11.lib.C_Sign(
self.rw_session.session, wrapped_bytelist, hmac_bytelist
)
)
# second call computes HMAC
self._check_error(
self.pkcs11.lib.C_Sign(
self.rw_session.session, wrapped_bytelist, hmac_bytelist
)
)
return b''.join(chr(i) for i in hmac_bytelist)
def _verify_hmac(self, hmac_key, hmac_bytelist, wrapped_bytelist):
m = PyKCS11.LowLevel.CK_MECHANISM()
m.mechanism = PyKCS11.LowLevel.CKM_SHA256_HMAC
self._check_error(
self.pkcs11.lib.C_VerifyInit(self.rw_session.session, m, hmac_key)
)
self._check_error(
self.pkcs11.lib.C_Verify(
self.rw_session.session, wrapped_bytelist, hmac_bytelist
)
)
def _unwrap_key(self, plugin_meta):
"""Unwraps byte string to key handle in HSM.
:param plugin_meta: kek_meta_dto plugin meta (json string)
:returns: Key handle from HSM. No unencrypted bytes.
"""
meta = json.loads(plugin_meta)
iv = base64.b64decode(meta['iv'])
hmac = base64.b64decode(meta['hmac'])
wrapped_key = base64.b64decode(meta['wrapped_key'])
mkek = self._get_key_handle(meta['mkek_label'])
hmac_key = self._get_key_handle(meta['hmac_label'])
LOG.debug("Unwrapping key with %s mkek label", meta['mkek_label'])
hmac_bytelist = PyKCS11.ckbytelist()
hmac_bytelist.reserve(len(hmac))
for x in hmac:
hmac_bytelist.append(ord(x))
wrapped_bytelist = PyKCS11.ckbytelist()
wrapped_bytelist.reserve(len(wrapped_key))
for x in wrapped_key:
wrapped_bytelist.append(ord(x))
LOG.debug("Verifying key with %s hmac label", meta['hmac_label'])
self._verify_hmac(hmac_key, hmac_bytelist, wrapped_bytelist)
unwrapped = PyKCS11.LowLevel.CK_OBJECT_HANDLE()
m = PyKCS11.LowLevel.CK_MECHANISM()
m.mechanism = PyKCS11.LowLevel.CKM_AES_CBC_PAD
m.pParameter = iv
template = (
(PyKCS11.CKA_CLASS, PyKCS11.CKO_SECRET_KEY),
(PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_AES),
(PyKCS11.CKA_ENCRYPT, True),
(PyKCS11.CKA_DECRYPT, True),
(PyKCS11.CKA_TOKEN, False),
(PyKCS11.CKA_WRAP, True),
(PyKCS11.CKA_UNWRAP, True),
(PyKCS11.CKA_EXTRACTABLE, True)
)
ckattr = self.session._template2ckattrlist(template)
self._check_error(
self.pkcs11.lib.C_UnwrapKey(
self.rw_session.session,
m,
mkek,
wrapped_bytelist,
ckattr,
unwrapped
)
)
return unwrapped
def encrypt(self, encrypt_dto, kek_meta_dto, keystone_id): def encrypt(self, encrypt_dto, kek_meta_dto, keystone_id):
key = self._get_key_by_label(kek_meta_dto.kek_label) key = self._unwrap_key(kek_meta_dto.plugin_meta)
iv = self._generate_iv() iv = self._generate_iv()
gcm = self._build_gcm_params(iv) gcm = self._build_gcm_params(iv)
mech = PyKCS11.Mechanism(self.algorithm, gcm) mech = PyKCS11.Mechanism(self.algorithm, gcm)
@ -148,7 +345,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended, def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
keystone_id): keystone_id):
key = self._get_key_by_label(kek_meta_dto.kek_label) key = self._unwrap_key(kek_meta_dto.plugin_meta)
meta_extended = json.loads(kek_meta_extended) meta_extended = json.loads(kek_meta_extended)
iv = base64.b64decode(meta_extended['iv']) iv = base64.b64decode(meta_extended['iv'])
gcm = self._build_gcm_params(iv) gcm = self._build_gcm_params(iv)
@ -158,16 +355,17 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
return secret return secret
def bind_kek_metadata(self, kek_meta_dto): def bind_kek_metadata(self, kek_meta_dto):
# Enforce idempotency: If we've already generated a key for the # Enforce idempotency: If we've already generated a key leave now.
# kek_label, leave now. if not kek_meta_dto.plugin_meta:
key = self._get_key_by_label(kek_meta_dto.kek_label) kek_meta_dto.plugin_meta = json.dumps(
if not key: self._generate_wrapped_kek(
self._generate_kek(kek_meta_dto.kek_label) kek_meta_dto.kek_label, 32
)
)
# To be persisted by Barbican: # To be persisted by Barbican:
kek_meta_dto.algorithm = 'AES' kek_meta_dto.algorithm = 'AES'
kek_meta_dto.bit_length = self.kek_key_length * 8 kek_meta_dto.bit_length = 32 * 8
kek_meta_dto.mode = 'GCM' kek_meta_dto.mode = 'CBC'
kek_meta_dto.plugin_meta = None
return kek_meta_dto return kek_meta_dto

View File

@ -34,6 +34,7 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase):
self.pkcs11 = self.p11_mock.PyKCS11Lib() self.pkcs11 = self.p11_mock.PyKCS11Lib()
self.p11_mock.PyKCS11Error.return_value = Exception() self.p11_mock.PyKCS11Error.return_value = Exception()
self.pkcs11.lib.C_Initialize.return_value = self.p11_mock.CKR_OK self.pkcs11.lib.C_Initialize.return_value = self.p11_mock.CKR_OK
self.pkcs11.lib.C_GenerateKey.return_value = self.p11_mock.CKR_OK
self.cfg_mock = mock.MagicMock(name='config mock') self.cfg_mock = mock.MagicMock(name='config mock')
self.plugin = p11_crypto.P11CryptoPlugin(self.cfg_mock) self.plugin = p11_crypto.P11CryptoPlugin(self.cfg_mock)
self.session = self.pkcs11.openSession() self.session = self.pkcs11.openSession()
@ -43,22 +44,25 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase):
self.patcher.stop() self.patcher.stop()
def test_generate_calls_generate_random(self): def test_generate_calls_generate_random(self):
self.session.generateRandom.return_value = [1, 2, 3, 4, 5, 6, 7, with mock.patch.object(self.plugin, 'encrypt') as encrypt_mock:
8, 9, 10, 11, 12, 13, # patch out the encrypt call since it is irrelevant in this test
14, 15, 16] encrypt_mock.return_value = None
secret = models.Secret() self.session.generateRandom.return_value = [1, 2, 3, 4, 5, 6, 7,
secret.bit_length = 128 8, 9, 10, 11, 12, 13,
secret.algorithm = "AES" 14, 15, 16]
generate_dto = plugin_import.GenerateDTO( secret = models.Secret()
secret.algorithm, secret.bit_length = 128
secret.bit_length, secret.algorithm = "AES"
None, None) generate_dto = plugin_import.GenerateDTO(
self.plugin.generate_symmetric( secret.algorithm,
generate_dto, secret.bit_length,
mock.MagicMock(), None, None)
mock.MagicMock() self.plugin.generate_symmetric(
) generate_dto,
self.session.generateRandom.assert_called_twice_with(16) mock.MagicMock(),
mock.MagicMock()
)
self.session.generateRandom.assert_called_twice_with(16)
def test_generate_errors_when_rand_length_is_not_as_requested(self): def test_generate_errors_when_rand_length_is_not_as_requested(self):
self.session.generateRandom.return_value = [1, 2, 3, 4, 5, 6, 7] self.session.generateRandom.return_value = [1, 2, 3, 4, 5, 6, 7]
@ -91,7 +95,6 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase):
self.pkcs11.lib.C_Initialize.return_value = 12345 self.pkcs11.lib.C_Initialize.return_value = 12345
m.p11_crypto_plugin = mock.MagicMock(library_path="/dev/null") m.p11_crypto_plugin = mock.MagicMock(library_path="/dev/null")
# TODO(reaperhulk): Really raises PyKCS11.PyKCS11Error
pykcs11error = Exception pykcs11error = Exception
self.assertRaises( self.assertRaises(
pykcs11error, pykcs11error,
@ -99,28 +102,23 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase):
m, m,
) )
def test_init_builds_sessions_and_login(self): def test_get_key_handle_with_two_keys(self):
self.pkcs11.openSession.assert_any_call(1)
self.pkcs11.openSession.assert_any_call(1, 'RW')
self.assertTrue(self.session.login.called)
def test_get_key_by_label_with_two_keys(self):
self.session.findObjects.return_value = ['key1', 'key2'] self.session.findObjects.return_value = ['key1', 'key2']
self.assertRaises( self.assertRaises(
p11_crypto.P11CryptoPluginKeyException, p11_crypto.P11CryptoPluginKeyException,
self.plugin._get_key_by_label, self.plugin._get_key_handle,
'mylabel', 'mylabel',
) )
def test_get_key_by_label_with_one_key(self): def test_get_key_handle_with_one_key(self):
key = 'key1' key = 'key1'
self.session.findObjects.return_value = [key] self.session.findObjects.return_value = [key]
key_label = self.plugin._get_key_by_label('mylabel') key_label = self.plugin._get_key_handle('mylabel')
self.assertEqual(key, key_label) self.assertEqual(key, key_label)
def test_get_key_by_label_with_no_keys(self): def test_get_key_handle_with_no_keys(self):
self.session.findObjects.return_value = [] self.session.findObjects.return_value = []
result = self.plugin._get_key_by_label('mylabel') result = self.plugin._get_key_handle('mylabel')
self.assertIsNone(result) self.assertIsNone(result)
def test_generate_iv_calls_generate_random(self): def test_generate_iv_calls_generate_random(self):
@ -140,14 +138,14 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase):
) )
def test_build_gcm_params(self): def test_build_gcm_params(self):
class GCM_Mock(object): class GCMMock(object):
def __init__(self): def __init__(self):
self.pIv = None self.pIv = None
self.ulIvLen = None self.ulIvLen = None
self.ulIvBits = None self.ulIvBits = None
self.ulTagBits = None self.ulTagBits = None
self.p11_mock.LowLevel.CK_AES_GCM_PARAMS.return_value = GCM_Mock() self.p11_mock.LowLevel.CK_AES_GCM_PARAMS.return_value = GCMMock()
iv = b'sixteen_byte_iv_' iv = b'sixteen_byte_iv_'
gcm = self.plugin._build_gcm_params(iv) gcm = self.plugin._build_gcm_params(iv)
self.assertEqual(iv, gcm.pIv) self.assertEqual(iv, gcm.pIv)
@ -156,9 +154,7 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase):
self.assertEqual(128, gcm.ulIvBits) self.assertEqual(128, gcm.ulIvBits)
def test_encrypt(self): def test_encrypt(self):
key = 'key1'
payload = 'encrypt me!!' payload = 'encrypt me!!'
self.session.findObjects.return_value = [key]
self.session.generateRandom.return_value = [1, 2, 3, 4, 5, 6, 7, self.session.generateRandom.return_value = [1, 2, 3, 4, 5, 6, 7,
8, 9, 10, 11, 12, 13, 8, 9, 10, 11, 12, 13,
14, 15, 16] 14, 15, 16]
@ -166,35 +162,37 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase):
self.p11_mock.Mechanism.return_value = mech self.p11_mock.Mechanism.return_value = mech
self.session.encrypt.return_value = [1, 2, 3, 4, 5] self.session.encrypt.return_value = [1, 2, 3, 4, 5]
encrypt_dto = plugin_import.EncryptDTO(payload) encrypt_dto = plugin_import.EncryptDTO(payload)
response_dto = self.plugin.encrypt(encrypt_dto, with mock.patch.object(self.plugin, '_unwrap_key') as unwrap_key_mock:
mock.MagicMock(), unwrap_key_mock.return_value = 'unwrapped_key'
mock.MagicMock()) response_dto = self.plugin.encrypt(encrypt_dto,
mock.MagicMock(),
mock.MagicMock())
self.session.encrypt.assert_called_once_with(key, self.session.encrypt.assert_called_once_with('unwrapped_key',
payload, payload,
mech) mech)
self.assertEqual(b'\x01\x02\x03\x04\x05', response_dto.cypher_text) self.assertEqual(b'\x01\x02\x03\x04\x05', response_dto.cypher_text)
self.assertEqual('{"iv": "AQIDBAUGBwgJCgsMDQ4PEA=="}', self.assertEqual('{"iv": "AQIDBAUGBwgJCgsMDQ4PEA=="}',
response_dto.kek_meta_extended) response_dto.kek_meta_extended)
def test_decrypt(self): def test_decrypt(self):
key = 'key1'
ct = mock.MagicMock() ct = mock.MagicMock()
self.session.findObjects.return_value = [key]
self.session.decrypt.return_value = [100, 101, 102, 103] self.session.decrypt.return_value = [100, 101, 102, 103]
mech = mock.MagicMock() mech = mock.MagicMock()
self.p11_mock.Mechanism.return_value = mech self.p11_mock.Mechanism.return_value = mech
kek_meta_extended = '{"iv": "AQIDBAUGBwgJCgsMDQ4PEA=="}' kek_meta_extended = '{"iv": "AQIDBAUGBwgJCgsMDQ4PEA=="}'
decrypt_dto = plugin_import.DecryptDTO(ct) decrypt_dto = plugin_import.DecryptDTO(ct)
payload = self.plugin.decrypt(decrypt_dto, with mock.patch.object(self.plugin, '_unwrap_key') as unwrap_key_mock:
mock.MagicMock(), unwrap_key_mock.return_value = 'unwrapped_key'
kek_meta_extended, payload = self.plugin.decrypt(decrypt_dto,
mock.MagicMock()) mock.MagicMock(),
self.assertTrue(self.p11_mock.Mechanism.called) kek_meta_extended,
self.session.decrypt.assert_called_once_with(key, mock.MagicMock())
ct, self.assertTrue(self.p11_mock.Mechanism.called)
mech) self.session.decrypt.assert_called_once_with('unwrapped_key',
self.assertEqual(b'defg', payload) ct,
mech)
self.assertEqual(b'defg', payload)
def test_bind_kek_metadata_without_existing_key(self): def test_bind_kek_metadata_without_existing_key(self):
self.session.findObjects.return_value = [] # no existing key self.session.findObjects.return_value = [] # no existing key
@ -208,15 +206,8 @@ class WhenTestingP11CryptoPlugin(testtools.TestCase):
def test_bind_kek_metadata_with_existing_key(self): def test_bind_kek_metadata_with_existing_key(self):
self.session.findObjects.return_value = ['key1'] # one key self.session.findObjects.return_value = ['key1'] # one key
self.plugin.bind_kek_metadata(mock.MagicMock()) dto_mock = mock.MagicMock()
self.assertEqual(self.plugin.bind_kek_metadata(dto_mock), dto_mock)
gk = self.pkcs11.lib.C_Generate_Key
# this is a way to test to make sure methods are NOT called
self.assertEqual([], gk.call_args_list)
t = self.session._template2ckattrlist
self.assertEqual([], t.call_args_list)
m = self.p11_mock.LowLevel.CK_MECHANISM
self.assertEqual([], m.call_args_list)
def test_generate_asymmetric_raises_error(self): def test_generate_asymmetric_raises_error(self):
self.assertRaises(NotImplementedError, self.assertRaises(NotImplementedError,

View File

@ -168,6 +168,19 @@ dogtag_port = 8443
nss_db_path = '/etc/barbican/alias' nss_db_path = '/etc/barbican/alias'
nss_password = 'password123' nss_password = 'password123'
[p11_crypto_plugin]
# Path to vendor PKCS11 library
library_path = '/usr/lib/libCryptoki2_64.so'
# Password to login to PKCS11 session
login = 'mypassword'
# Label to identify master KEK in the HSM (must not be the same as HMAC label)
mkek_label = 'an_mkek'
# Length in bytes of master KEK
mkek_length = 32
# Label to identify HMAC key in the HSM (must not be the same as MKEK label)
hmac_label = 'my_hmac_label'
# ================== KMIP plugin ===================== # ================== KMIP plugin =====================
[kmip_plugin] [kmip_plugin]
username = 'admin' username = 'admin'