Fix KEK generation in the P11 Plugin + fix unit tests
Buckle up, this is going to be a long commit msg. Background: * Henry Yamauchi noted that the bind_kek_metadata method was returning an object early. This return prevented the entirety of the code that created the KEK on the HSM from executing * This was not caught because the mock tests that checked for invocation of the method were wrong. Specifically they called methods like assert_called_once() and assert_called_twice(). Unfortunately, while assert_called_once_with(), et al are real methods on the mock objects that really do assert things, assert_called_once() is not. And when you call a method that doesn't exist on a mock object it returns another mock object. So many of these tests were testing nothing. Fixes: * bind_kek_metadata has been refactored to separate out the kek generation into a new method (_generate_kek) to make the expected flow clearer. And, of course, the bug has been fixed * The tests have had all the incorrect assert calls removed. In some cases the call was actually unnecessary (typically because the test already used a return value from the mock so testing that it was called was not needed). In other cases an assert was added that checks that the called method on the mock object is true Change-Id: I17a7adf7e3f386f3b0474e79731e7faab8911481
This commit is contained in:
@ -94,6 +94,36 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
gcm.ulTagBits = 128
|
||||
return gcm
|
||||
|
||||
def _generate_kek(self, kek_label):
|
||||
# TODO: review template to ensure it's what we want
|
||||
template = (
|
||||
(PyKCS11.CKA_CLASS, PyKCS11.CKO_SECRET_KEY),
|
||||
(PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_AES),
|
||||
(PyKCS11.CKA_VALUE_LEN, self.kek_key_length),
|
||||
(PyKCS11.CKA_LABEL, kek_label),
|
||||
(PyKCS11.CKA_PRIVATE, True),
|
||||
(PyKCS11.CKA_SENSITIVE, True),
|
||||
(PyKCS11.CKA_ENCRYPT, True),
|
||||
(PyKCS11.CKA_DECRYPT, True),
|
||||
(PyKCS11.CKA_TOKEN, True),
|
||||
(PyKCS11.CKA_WRAP, True),
|
||||
(PyKCS11.CKA_UNWRAP, True),
|
||||
(PyKCS11.CKA_EXTRACTABLE, False))
|
||||
ckattr = self.session._template2ckattrlist(template)
|
||||
|
||||
m = PyKCS11.LowLevel.CK_MECHANISM()
|
||||
m.mechanism = PyKCS11.LowLevel.CKM_AES_KEY_GEN
|
||||
|
||||
key = PyKCS11.LowLevel.CK_OBJECT_HANDLE()
|
||||
self._check_error(
|
||||
self.pkcs11.lib.C_GenerateKey(
|
||||
self.rw_session.session,
|
||||
m,
|
||||
ckattr,
|
||||
key
|
||||
)
|
||||
)
|
||||
|
||||
def encrypt(self, unencrypted, kek_meta_dto, keystone_id):
|
||||
key = self._get_key_by_label(kek_meta_dto.kek_label)
|
||||
iv = self._generate_iv()
|
||||
@ -121,46 +151,16 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
# Enforce idempotency: If we've already generated a key for the
|
||||
# kek_label, leave now.
|
||||
key = self._get_key_by_label(kek_meta_dto.kek_label)
|
||||
if key:
|
||||
return kek_meta_dto
|
||||
if not key:
|
||||
self._generate_kek(kek_meta_dto.kek_label)
|
||||
# To be persisted by Barbican:
|
||||
kek_meta_dto.algorithm = 'AES'
|
||||
kek_meta_dto.bit_length = self.kek_key_length * 8
|
||||
kek_meta_dto.mode = 'GCM'
|
||||
kek_meta_dto.plugin_meta = None
|
||||
|
||||
# To be persisted by Barbican:
|
||||
kek_meta_dto.algorithm = 'AES'
|
||||
kek_meta_dto.bit_length = self.kek_key_length * 8
|
||||
kek_meta_dto.mode = 'GCM'
|
||||
kek_meta_dto.plugin_meta = None
|
||||
return kek_meta_dto
|
||||
|
||||
# Generate the key.
|
||||
# TODO: review template to ensure it's what we want
|
||||
template = (
|
||||
(PyKCS11.CKA_CLASS, PyKCS11.CKO_SECRET_KEY),
|
||||
(PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_AES),
|
||||
(PyKCS11.CKA_VALUE_LEN, self.kek_key_length),
|
||||
(PyKCS11.CKA_LABEL, kek_meta_dto.kek_label),
|
||||
(PyKCS11.CKA_PRIVATE, True),
|
||||
(PyKCS11.CKA_SENSITIVE, True),
|
||||
(PyKCS11.CKA_ENCRYPT, True),
|
||||
(PyKCS11.CKA_DECRYPT, True),
|
||||
(PyKCS11.CKA_TOKEN, True),
|
||||
(PyKCS11.CKA_WRAP, True),
|
||||
(PyKCS11.CKA_UNWRAP, True),
|
||||
(PyKCS11.CKA_EXTRACTABLE, False))
|
||||
ckattr = self.session._template2ckattrlist(template)
|
||||
|
||||
m = PyKCS11.LowLevel.CK_MECHANISM()
|
||||
m.mechanism = PyKCS11.LowLevel.CKM_AES_KEY_GEN
|
||||
|
||||
key = PyKCS11.LowLevel.CK_OBJECT_HANDLE()
|
||||
self._check_error(
|
||||
self.pkcs11.lib.C_GenerateKey(
|
||||
self.rw_session.session,
|
||||
m,
|
||||
ckattr,
|
||||
key
|
||||
)
|
||||
)
|
||||
|
||||
def create(self, bit_length, type_enum, algorithm=None, mode=None):
|
||||
byte_length = bit_length / 8
|
||||
rand = self.session.generateRandom(byte_length)
|
||||
|
@ -69,26 +69,22 @@ class WhenTestingP11CryptoPlugin(unittest.TestCase):
|
||||
|
||||
def test_init_builds_sessions_and_login(self):
|
||||
self.pkcs11.openSession.assert_any_call(1)
|
||||
self.pkcs11.openSession.login.assert_called_twice()
|
||||
self.pkcs11.openSession.assert_any_call(1, 'RW')
|
||||
self.session.login.assert_called_twice()
|
||||
self.assertTrue(self.session.login.called)
|
||||
|
||||
def test_get_key_by_label_with_two_keys(self):
|
||||
self.session.findObjects.return_value = ['key1', 'key2']
|
||||
self.session.findObjects.assert_called_once()
|
||||
with self.assertRaises(p11_crypto.P11CryptoPluginKeyException):
|
||||
self.plugin._get_key_by_label('mylabel')
|
||||
|
||||
def test_get_key_by_label_with_one_key(self):
|
||||
key = 'key1'
|
||||
self.session.findObjects.return_value = [key]
|
||||
self.session.findObjects.assert_called_once()
|
||||
key_label = self.plugin._get_key_by_label('mylabel')
|
||||
self.assertEqual(key, key_label)
|
||||
|
||||
def test_get_key_by_label_with_no_keys(self):
|
||||
self.session.findObjects.return_value = []
|
||||
self.session.findObjects.assert_called_once()
|
||||
result = self.plugin._get_key_by_label('mylabel')
|
||||
self.assertIsNone(result)
|
||||
|
||||
@ -136,7 +132,6 @@ class WhenTestingP11CryptoPlugin(unittest.TestCase):
|
||||
mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
|
||||
self.p11_mock.Mechanism.assert_called_once()
|
||||
self.session.encrypt.assert_called_once_with(key,
|
||||
payload,
|
||||
mech)
|
||||
@ -156,7 +151,7 @@ class WhenTestingP11CryptoPlugin(unittest.TestCase):
|
||||
mock.MagicMock(),
|
||||
kek_meta_extended,
|
||||
mock.MagicMock())
|
||||
self.p11_mock.Mechanism.assert_called_once()
|
||||
self.assertTrue(self.p11_mock.Mechanism.called)
|
||||
self.session.decrypt.assert_called_once_with(key,
|
||||
ct,
|
||||
mech)
|
||||
@ -168,16 +163,15 @@ class WhenTestingP11CryptoPlugin(unittest.TestCase):
|
||||
|
||||
self.plugin.bind_kek_metadata(mock.MagicMock())
|
||||
|
||||
self.p11_mock.lib.C_Generate_Key.assert_called_once()
|
||||
self.session._template2ckattrlist.assert_called_once()
|
||||
self.p11_mock.LowLevel.CK_MECHANISM.assert_called_once()
|
||||
self.assertTrue(self.session._template2ckattrlist.called)
|
||||
self.assertTrue(self.p11_mock.LowLevel.CK_MECHANISM.called)
|
||||
|
||||
def test_bind_kek_metadata_with_existing_key(self):
|
||||
self.session.findObjects.return_value = ['key1'] # one key
|
||||
|
||||
self.plugin.bind_kek_metadata(mock.MagicMock())
|
||||
|
||||
gk = self.p11_mock.lib.C_Generate_Key
|
||||
gk = self.pkcs11.lib.C_Generate_Key
|
||||
# this is a way to test to make sure methods are NOT called
|
||||
self.assertItemsEqual([], gk.call_args_list)
|
||||
t = self.session._template2ckattrlist
|
||||
|
Reference in New Issue
Block a user