Add retry for recoverable PKCS11 errors
When using the p11_crypto module with an HSM, certain errors can be thrown by the device that currently require the Barbican application to be restarted to recover. This CR adds to work already done to the pkcs11 module that will trap known errors and will raise a specific exception that can be handled gracefully without the need to restart the entire application. In addition, the p11_crypto module has been enhanced to use a retry mechanism when these known errors are raised after reinitializing the pkcs11 library. This was done specifically to trap the CKR_TOKEN_NOT_PRESENT error from an HSM, but can be enhanced further in the future to handle additional error conditions that are recoverable with a simple reinitialization of the library to prevent the need to restart the entire Barbican application. Change-Id: Ic43f3729bff00560d4a344f785416546c019e016 Closes-Bug: 1582884
This commit is contained in:
parent
312a8753f9
commit
88aac6e6f1
@ -510,3 +510,23 @@ class SubCADeletionErrors(BarbicanHTTPException):
|
||||
message = u._("Errors returned by CA when attempting to delete "
|
||||
"subordinate CA: %(reason)")
|
||||
client_message = message
|
||||
|
||||
|
||||
class PKCS11Exception(BarbicanException):
|
||||
message = u._("There was an error with the PKCS#11 library.")
|
||||
|
||||
|
||||
class P11CryptoPluginKeyException(PKCS11Exception):
|
||||
message = u._("More than one key found for label")
|
||||
|
||||
|
||||
class P11CryptoPluginException(PKCS11Exception):
|
||||
message = u._("General exception")
|
||||
|
||||
|
||||
class P11CryptoKeyHandleException(PKCS11Exception):
|
||||
message = u._("No key handle was found")
|
||||
|
||||
|
||||
class P11CryptoTokenException(PKCS11Exception):
|
||||
message = u._("No token was found in slot %(slot_id)s")
|
||||
|
@ -20,6 +20,7 @@ from oslo_config import cfg
|
||||
from oslo_serialization import jsonutils as json
|
||||
|
||||
from barbican.common import config
|
||||
from barbican.common import exception
|
||||
from barbican.common import utils
|
||||
from barbican import i18n as u
|
||||
from barbican.plugin.crypto import crypto as plugin
|
||||
@ -95,23 +96,48 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
self.pkek_cache_limit = plugin_conf.pkek_cache_limit
|
||||
self.algorithm = plugin_conf.algorithm
|
||||
|
||||
# Master Key cache
|
||||
self.mk_cache = {}
|
||||
self.mk_cache_lock = threading.RLock()
|
||||
|
||||
# Project KEK cache
|
||||
self.pkek_cache = collections.OrderedDict()
|
||||
self.pkek_cache_lock = threading.RLock()
|
||||
|
||||
# Session for object caching
|
||||
self.caching_session = self.pkcs11.get_session()
|
||||
self.caching_session_lock = threading.RLock()
|
||||
|
||||
# Cache master keys
|
||||
self._get_master_key(self.mkek_label)
|
||||
self._get_master_key(self.hmac_label)
|
||||
self._configure_object_cache()
|
||||
|
||||
def encrypt(self, encrypt_dto, kek_meta_dto, project_id):
|
||||
return self._call_pkcs11(self._encrypt, encrypt_dto, kek_meta_dto,
|
||||
project_id)
|
||||
|
||||
def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
|
||||
project_id):
|
||||
return self._call_pkcs11(self._decrypt, decrypt_dto, kek_meta_dto,
|
||||
kek_meta_extended, project_id)
|
||||
|
||||
def bind_kek_metadata(self, kek_meta_dto):
|
||||
return self._call_pkcs11(self._bind_kek_metadata, kek_meta_dto)
|
||||
|
||||
def generate_symmetric(self, generate_dto, kek_meta_dto, project_id):
|
||||
return self._call_pkcs11(self._generate_symmetric, generate_dto,
|
||||
kek_meta_dto, project_id)
|
||||
|
||||
def generate_asymmetric(self, generate_dto, kek_meta_dto, project_id):
|
||||
raise NotImplementedError(u._("Feature not implemented for PKCS11"))
|
||||
|
||||
def supports(self, type_enum, algorithm=None, bit_length=None, mode=None):
|
||||
if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
def _call_pkcs11(self, func, *args, **kwargs):
|
||||
# Wrap pkcs11 calls to enable a single retry when exceptions are raised
|
||||
# that can be fixed by reinitializing the pkcs11 library
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except (exception.PKCS11Exception) as pe:
|
||||
LOG.warn("Reinitializing PKCS#11 library: {e}".format(e=pe))
|
||||
self._reinitialize_pkcs11()
|
||||
return func(*args, **kwargs)
|
||||
|
||||
def _encrypt(self, encrypt_dto, kek_meta_dto, project_id):
|
||||
kek = self._load_kek_from_meta_dto(kek_meta_dto)
|
||||
try:
|
||||
session = self._get_session()
|
||||
@ -127,8 +153,8 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
)
|
||||
return plugin.ResponseDTO(ct_data['ct'], kek_meta_extended)
|
||||
|
||||
def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
|
||||
project_id):
|
||||
def _decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
|
||||
project_id):
|
||||
kek = self._load_kek_from_meta_dto(kek_meta_dto)
|
||||
meta_extended = json.loads(kek_meta_extended)
|
||||
iv = base64.b64decode(meta_extended['iv'])
|
||||
@ -144,7 +170,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
|
||||
return pt_data
|
||||
|
||||
def bind_kek_metadata(self, kek_meta_dto):
|
||||
def _bind_kek_metadata(self, kek_meta_dto):
|
||||
if not kek_meta_dto.plugin_meta:
|
||||
# Generate wrapped kek and jsonify
|
||||
wkek = self._generate_wrapped_kek(
|
||||
@ -158,7 +184,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
kek_meta_dto.mode = 'CBC'
|
||||
return kek_meta_dto
|
||||
|
||||
def generate_symmetric(self, generate_dto, kek_meta_dto, project_id):
|
||||
def _generate_symmetric(self, generate_dto, kek_meta_dto, project_id):
|
||||
kek = self._load_kek_from_meta_dto(kek_meta_dto)
|
||||
byte_length = int(generate_dto.bit_length) // 8
|
||||
|
||||
@ -175,18 +201,22 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
)
|
||||
return plugin.ResponseDTO(ct_data['ct'], kek_meta_extended)
|
||||
|
||||
def generate_asymmetric(self, generate_dto, kek_meta_dto, project_id):
|
||||
raise NotImplementedError(u._("Feature not implemented for PKCS11"))
|
||||
def _configure_object_cache(self):
|
||||
# Master Key cache
|
||||
self.mk_cache = {}
|
||||
self.mk_cache_lock = threading.RLock()
|
||||
|
||||
def supports(self, type_enum, algorithm=None, bit_length=None, mode=None):
|
||||
if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
# Project KEK cache
|
||||
self.pkek_cache = collections.OrderedDict()
|
||||
self.pkek_cache_lock = threading.RLock()
|
||||
|
||||
# Session for object caching
|
||||
self.caching_session = self._get_session()
|
||||
self.caching_session_lock = threading.RLock()
|
||||
|
||||
# Cache master keys
|
||||
self._get_master_key(self.mkek_label)
|
||||
self._get_master_key(self.hmac_label)
|
||||
|
||||
def _pkek_cache_add(self, kek, label):
|
||||
with self.pkek_cache_lock:
|
||||
@ -227,7 +257,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
else:
|
||||
break
|
||||
|
||||
def _create_pkcs11(self, plugin_conf, ffi):
|
||||
def _create_pkcs11(self, plugin_conf, ffi=None):
|
||||
return pkcs11.PKCS11(
|
||||
library_path=plugin_conf.library_path,
|
||||
login_passphrase=plugin_conf.login,
|
||||
@ -237,6 +267,22 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
algorithm=plugin_conf.algorithm
|
||||
)
|
||||
|
||||
def _reinitialize_pkcs11(self):
|
||||
self.pkcs11.finalize()
|
||||
self.pkcs11 = None
|
||||
|
||||
with self.caching_session_lock:
|
||||
self.caching_session = None
|
||||
|
||||
with self.pkek_cache_lock:
|
||||
self.pkek_cache.clear()
|
||||
|
||||
with self.mk_cache_lock:
|
||||
self.mk_cache.clear()
|
||||
|
||||
self.pkcs11 = self._create_pkcs11(self.conf.p11_crypto_plugin)
|
||||
self._configure_object_cache()
|
||||
|
||||
def _get_session(self):
|
||||
return self.pkcs11.get_session()
|
||||
|
||||
@ -251,7 +297,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
with self.caching_session_lock:
|
||||
key = self.pkcs11.get_key_handle(label, session)
|
||||
if key is None:
|
||||
raise pkcs11.P11CryptoKeyHandleException(
|
||||
raise exception.P11CryptoKeyHandleException(
|
||||
u._("Could not find key labeled {0}").format(label)
|
||||
)
|
||||
self.mk_cache[label] = key
|
||||
@ -326,7 +372,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
session = self.caching_session
|
||||
if key_label in self.mk_cache or \
|
||||
self.pkcs11.get_key_handle(key_label, session) is not None:
|
||||
raise pkcs11.P11CryptoPluginKeyException(
|
||||
raise exception.P11CryptoPluginKeyException(
|
||||
u._("A master key with that label already exists")
|
||||
)
|
||||
mk = self.pkcs11.generate_key(
|
||||
@ -341,7 +387,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
||||
session = self.caching_session
|
||||
if key_label in self.mk_cache or \
|
||||
self.pkcs11.get_key_handle(key_label, session) is not None:
|
||||
raise pkcs11.P11CryptoPluginKeyException(
|
||||
raise exception.P11CryptoPluginKeyException(
|
||||
u._("A master key with that label already exists")
|
||||
)
|
||||
mk = self.pkcs11.generate_key(
|
||||
|
@ -279,6 +279,7 @@ def build_ffi():
|
||||
# FUNCTIONS
|
||||
ffi.cdef(textwrap.dedent("""
|
||||
CK_RV C_Initialize(void *);
|
||||
CK_RV C_Finalize(void *);
|
||||
CK_RV C_OpenSession(CK_SLOT_ID, CK_FLAGS, void *, CK_NOTIFY,
|
||||
CK_SESSION_HANDLE *);
|
||||
CK_RV C_CloseSession(CK_SESSION_HANDLE);
|
||||
@ -320,18 +321,6 @@ def build_ffi():
|
||||
return ffi
|
||||
|
||||
|
||||
class P11CryptoPluginKeyException(exception.BarbicanException):
|
||||
message = u._("More than one key found for label")
|
||||
|
||||
|
||||
class P11CryptoPluginException(exception.BarbicanException):
|
||||
message = u._("General exception")
|
||||
|
||||
|
||||
class P11CryptoKeyHandleException(exception.BarbicanException):
|
||||
message = u._("No key handle was found")
|
||||
|
||||
|
||||
class PKCS11(object):
|
||||
def __init__(self, library_path, login_passphrase, rw_session, slot_id,
|
||||
ffi=None, algorithm='CKM_AES_GCM'):
|
||||
@ -394,7 +383,7 @@ class PKCS11(object):
|
||||
rv = self.lib.C_FindObjectsFinal(session)
|
||||
self._check_error(rv)
|
||||
if count[0] > 1:
|
||||
raise P11CryptoPluginKeyException()
|
||||
raise exception.P11CryptoPluginKeyException()
|
||||
return key
|
||||
|
||||
def encrypt(self, key, pt_data, session):
|
||||
@ -447,7 +436,7 @@ class PKCS11(object):
|
||||
def generate_key(self, key_length, session, key_label=None,
|
||||
encrypt=False, sign=False, wrap=False, master_key=False):
|
||||
if not encrypt and not sign and not wrap:
|
||||
raise P11CryptoPluginException()
|
||||
raise exception.P11CryptoPluginException()
|
||||
if master_key and not key_label:
|
||||
raise ValueError(u._("key_label must be set for master_keys"))
|
||||
|
||||
@ -566,14 +555,20 @@ class PKCS11(object):
|
||||
rv = self.lib.C_DestroyObject(session, obj_handle)
|
||||
self._check_error(rv)
|
||||
|
||||
def finalize(self):
|
||||
rv = self.lib.C_Finalize(self.ffi.NULL)
|
||||
self._check_error(rv)
|
||||
|
||||
def _check_error(self, value):
|
||||
if value != CKR_OK:
|
||||
# TODO(jkf) Expand error handling to raise different exceptions
|
||||
# for notable errors we want to handle programmatically
|
||||
raise P11CryptoPluginException(u._(
|
||||
"HSM returned response code: {hex_value} {code}").format(
|
||||
hex_value=hex(value),
|
||||
code=ERROR_CODES.get(value, 'CKR_????')))
|
||||
code = ERROR_CODES.get(value, 'CKR_????')
|
||||
hex_code = "{hex} {code}".format(hex=hex(value), code=code)
|
||||
|
||||
if code == 'CKR_TOKEN_NOT_PRESENT':
|
||||
raise exception.P11CryptoTokenException(slot_id=self.slot_id)
|
||||
|
||||
raise exception.P11CryptoPluginException(u._(
|
||||
"HSM returned response code: {code}").format(code=hex_code))
|
||||
|
||||
def _generate_random(self, length, session):
|
||||
buf = self.ffi.new("CK_BYTE[{0}]".format(length))
|
||||
@ -635,7 +630,7 @@ class PKCS11(object):
|
||||
def _rng_self_test(self, session):
|
||||
test_random = self.generate_random(100, session)
|
||||
if test_random == b'\x00' * 100:
|
||||
raise P11CryptoPluginException(
|
||||
raise exception.P11CryptoPluginException(
|
||||
u._("Apparent RNG self-test failure."))
|
||||
|
||||
def _build_gcm_mechanism(self, iv):
|
||||
|
@ -16,6 +16,7 @@
|
||||
import mock
|
||||
import six
|
||||
|
||||
from barbican.common import exception as ex
|
||||
from barbican.model import models
|
||||
from barbican.plugin.crypto import crypto as plugin_import
|
||||
from barbican.plugin.crypto import p11_crypto
|
||||
@ -48,6 +49,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
||||
self.pkcs11.compute_hmac.return_value = b'1'
|
||||
self.pkcs11.verify_hmac.return_value = None
|
||||
self.pkcs11.destroy_object.return_value = None
|
||||
self.pkcs11.finalize.return_value = None
|
||||
|
||||
self.cfg_mock = mock.MagicMock(name='config mock')
|
||||
self.cfg_mock.p11_crypto_plugin.mkek_label = 'mkek_label'
|
||||
@ -120,7 +122,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
||||
|
||||
def test_encrypt_bad_session(self):
|
||||
self.pkcs11.get_session.return_value = mock.DEFAULT
|
||||
self.pkcs11.get_session.side_effect = pkcs11.P11CryptoPluginException(
|
||||
self.pkcs11.get_session.side_effect = ex.P11CryptoPluginException(
|
||||
'Testing error handling'
|
||||
)
|
||||
payload = b'test payload'
|
||||
@ -132,8 +134,8 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
||||
'"wrapped_key": "wrappedkey==",'
|
||||
'"mkek_label": "mkek_label",'
|
||||
'"hmac_label": "hmac_label"}')
|
||||
self.assertRaises(pkcs11.P11CryptoPluginException,
|
||||
self.plugin.encrypt,
|
||||
self.assertRaises(ex.P11CryptoPluginException,
|
||||
self.plugin._encrypt,
|
||||
encrypt_dto,
|
||||
kek_meta,
|
||||
mock.MagicMock())
|
||||
@ -172,7 +174,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
||||
|
||||
def test_decrypt_bad_session(self):
|
||||
self.pkcs11.get_session.return_value = mock.DEFAULT
|
||||
self.pkcs11.get_session.side_effect = pkcs11.P11CryptoPluginException(
|
||||
self.pkcs11.get_session.side_effect = ex.P11CryptoPluginException(
|
||||
'Testing error handling'
|
||||
)
|
||||
ct = b'ctct'
|
||||
@ -185,8 +187,8 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
||||
'"wrapped_key": "wrappedkey==",'
|
||||
'"mkek_label": "mkek_label",'
|
||||
'"hmac_label": "hmac_label"}')
|
||||
self.assertRaises(pkcs11.P11CryptoPluginException,
|
||||
self.plugin.decrypt,
|
||||
self.assertRaises(ex.P11CryptoPluginException,
|
||||
self.plugin._decrypt,
|
||||
decrypt_dto,
|
||||
kek_meta,
|
||||
kek_meta_extended,
|
||||
@ -264,7 +266,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
||||
|
||||
def test_missing_mkek(self):
|
||||
self.pkcs11.get_key_handle.return_value = None
|
||||
self.assertRaises(pkcs11.P11CryptoKeyHandleException,
|
||||
self.assertRaises(ex.P11CryptoKeyHandleException,
|
||||
self.plugin._get_master_key,
|
||||
'bad_key_label')
|
||||
|
||||
@ -282,12 +284,12 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
||||
self.assertEqual(self.pkcs11.generate_key.call_count, 1)
|
||||
|
||||
def test_cached_generate_mkek(self):
|
||||
self.assertRaises(pkcs11.P11CryptoPluginKeyException,
|
||||
self.assertRaises(ex.P11CryptoPluginKeyException,
|
||||
self.plugin._generate_mkek, 256, 'mkek_label')
|
||||
self.assertEqual(self.pkcs11.get_key_handle.call_count, 2)
|
||||
|
||||
def test_existing_generate_mkek(self):
|
||||
self.assertRaises(pkcs11.P11CryptoPluginKeyException,
|
||||
self.assertRaises(ex.P11CryptoPluginKeyException,
|
||||
self.plugin._generate_mkek, 256, 'mkek2_label')
|
||||
self.assertEqual(self.pkcs11.get_key_handle.call_count, 3)
|
||||
|
||||
@ -301,12 +303,12 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
||||
self.assertEqual(self.pkcs11.generate_key.call_count, 1)
|
||||
|
||||
def test_cached_generate_mkhk(self):
|
||||
self.assertRaises(pkcs11.P11CryptoPluginKeyException,
|
||||
self.assertRaises(ex.P11CryptoPluginKeyException,
|
||||
self.plugin._generate_mkhk, 256, 'hmac_label')
|
||||
self.assertEqual(self.pkcs11.get_key_handle.call_count, 2)
|
||||
|
||||
def test_existing_generate_mkhk(self):
|
||||
self.assertRaises(pkcs11.P11CryptoPluginKeyException,
|
||||
self.assertRaises(ex.P11CryptoPluginKeyException,
|
||||
self.plugin._generate_mkhk, 256, 'mkhk2_label')
|
||||
self.assertEqual(self.pkcs11.get_key_handle.call_count, 3)
|
||||
|
||||
@ -326,3 +328,33 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
||||
|
||||
p11 = self.plugin._create_pkcs11(self.cfg_mock.p11_crypto_plugin, ffi)
|
||||
self.assertIsInstance(p11, pkcs11.PKCS11)
|
||||
|
||||
def test_call_pkcs11_with_token_error(self):
|
||||
self.plugin._encrypt = mock.Mock()
|
||||
self.plugin._encrypt.side_effect = [ex.P11CryptoTokenException(
|
||||
'Testing error handling'
|
||||
),
|
||||
'test payload']
|
||||
self.plugin._reinitialize_pkcs11 = mock.Mock()
|
||||
self.plugin._reinitialize_pkcs11.return_value = mock.DEFAULT
|
||||
|
||||
self.plugin.encrypt(mock.MagicMock(), mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
|
||||
self.assertEqual(self.pkcs11.get_key_handle.call_count, 2)
|
||||
self.assertEqual(self.pkcs11.get_session.call_count, 1)
|
||||
self.assertEqual(self.pkcs11.return_session.call_count, 0)
|
||||
self.assertEqual(self.plugin._encrypt.call_count, 2)
|
||||
|
||||
def test_reinitialize_pkcs11(self):
|
||||
pkcs11 = self.pkcs11
|
||||
self.plugin._create_pkcs11 = mock.Mock()
|
||||
self.plugin._create_pkcs11.return_value = pkcs11
|
||||
self.plugin._configure_object_cache = mock.Mock()
|
||||
self.plugin._configure_object_cache.return_value = mock.DEFAULT
|
||||
|
||||
self.plugin._reinitialize_pkcs11()
|
||||
|
||||
self.assertEqual(self.pkcs11.finalize.call_count, 1)
|
||||
self.assertEqual(self.plugin._create_pkcs11.call_count, 1)
|
||||
self.assertEqual(self.plugin._configure_object_cache.call_count, 1)
|
||||
|
@ -14,6 +14,7 @@
|
||||
import mock
|
||||
import six
|
||||
|
||||
from barbican.common import exception
|
||||
from barbican.plugin.crypto import pkcs11
|
||||
from barbican.tests import utils
|
||||
|
||||
@ -28,6 +29,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
||||
|
||||
self.lib = mock.Mock()
|
||||
self.lib.C_Initialize.return_value = pkcs11.CKR_OK
|
||||
self.lib.C_Finalize.return_value = pkcs11.CKR_OK
|
||||
self.lib.C_OpenSession.side_effect = self._open_session
|
||||
self.lib.C_CloseSession.return_value = pkcs11.CKR_OK
|
||||
self.lib.C_GetSessionInfo.side_effect = self._get_session_user
|
||||
@ -165,7 +167,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
||||
self.ffi.buffer(buf)[:] = b'\x00' * length
|
||||
return pkcs11.CKR_OK
|
||||
self.lib.C_GenerateRandom.side_effect = _bad_generate_random
|
||||
self.assertRaises(pkcs11.P11CryptoPluginException,
|
||||
self.assertRaises(exception.P11CryptoPluginException,
|
||||
self.pkcs11._rng_self_test, mock.MagicMock())
|
||||
|
||||
def test_get_key_handle_one_key(self):
|
||||
@ -190,7 +192,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
||||
def test_get_key_handle_multiple_keys(self):
|
||||
self.lib.C_FindObjects.side_effect = self._find_objects_two
|
||||
|
||||
self.assertRaises(pkcs11.P11CryptoPluginKeyException,
|
||||
self.assertRaises(exception.P11CryptoPluginKeyException,
|
||||
self.pkcs11.get_key_handle, 'foo', mock.MagicMock())
|
||||
|
||||
self.assertEqual(self.lib.C_FindObjectsInit.call_count, 1)
|
||||
@ -213,7 +215,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
||||
self.assertEqual(self.lib.C_GenerateKey.call_count, 1)
|
||||
|
||||
def test_generate_key_no_flags(self):
|
||||
self.assertRaises(pkcs11.P11CryptoPluginException,
|
||||
self.assertRaises(exception.P11CryptoPluginException,
|
||||
self.pkcs11.generate_key, mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
|
||||
@ -337,3 +339,19 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
||||
def test_invalid_build_attributes(self):
|
||||
self.assertRaises(TypeError, self.pkcs11._build_attributes,
|
||||
[pkcs11.Attribute(pkcs11.CKA_CLASS, {})])
|
||||
|
||||
def test_finalize(self):
|
||||
self.pkcs11.finalize()
|
||||
|
||||
self.assertEqual(self.lib.C_Finalize.call_count, 1)
|
||||
|
||||
def test_check_error(self):
|
||||
self.assertIsNone(self.pkcs11._check_error(pkcs11.CKR_OK))
|
||||
|
||||
def test_check_error_with_without_specific_handling(self):
|
||||
self.assertRaises(exception.P11CryptoPluginException,
|
||||
self.pkcs11._check_error, 5)
|
||||
|
||||
def test_check_error_with_token_error(self):
|
||||
self.assertRaises(exception.P11CryptoTokenException,
|
||||
self.pkcs11._check_error, 0xe0)
|
||||
|
Loading…
x
Reference in New Issue
Block a user