Merge "Add retry for recoverable PKCS11 errors"
This commit is contained in:
commit
ceec5bcd30
|
@ -510,3 +510,23 @@ class SubCADeletionErrors(BarbicanHTTPException):
|
|||
message = u._("Errors returned by CA when attempting to delete "
|
||||
"subordinate CA: %(reason)")
|
||||
client_message = message
|
||||
|
||||
|
||||
class PKCS11Exception(BarbicanException):
|
||||
message = u._("There was an error with the PKCS#11 library.")
|
||||
|
||||
|
||||
class P11CryptoPluginKeyException(PKCS11Exception):
|
||||
message = u._("More than one key found for label")
|
||||
|
||||
|
||||
class P11CryptoPluginException(PKCS11Exception):
|
||||
message = u._("General exception")
|
||||
|
||||
|
||||
class P11CryptoKeyHandleException(PKCS11Exception):
|
||||
message = u._("No key handle was found")
|
||||
|
||||
|
||||
class P11CryptoTokenException(PKCS11Exception):
|
||||
message = u._("No token was found in slot %(slot_id)s")
|
||||
|
|
|
@ -20,6 +20,7 @@ from oslo_config import cfg
|
|||
from oslo_serialization import jsonutils as json
|
||||
|
||||
from barbican.common import config
|
||||
from barbican.common import exception
|
||||
from barbican.common import utils
|
||||
from barbican import i18n as u
|
||||
from barbican.plugin.crypto import crypto as plugin
|
||||
|
@ -95,23 +96,48 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
self.pkek_cache_limit = plugin_conf.pkek_cache_limit
|
||||
self.algorithm = plugin_conf.algorithm
|
||||
|
||||
# Master Key cache
|
||||
self.mk_cache = {}
|
||||
self.mk_cache_lock = threading.RLock()
|
||||
|
||||
# Project KEK cache
|
||||
self.pkek_cache = collections.OrderedDict()
|
||||
self.pkek_cache_lock = threading.RLock()
|
||||
|
||||
# Session for object caching
|
||||
self.caching_session = self.pkcs11.get_session()
|
||||
self.caching_session_lock = threading.RLock()
|
||||
|
||||
# Cache master keys
|
||||
self._get_master_key(self.mkek_label)
|
||||
self._get_master_key(self.hmac_label)
|
||||
self._configure_object_cache()
|
||||
|
||||
def encrypt(self, encrypt_dto, kek_meta_dto, project_id):
|
||||
return self._call_pkcs11(self._encrypt, encrypt_dto, kek_meta_dto,
|
||||
project_id)
|
||||
|
||||
def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
|
||||
project_id):
|
||||
return self._call_pkcs11(self._decrypt, decrypt_dto, kek_meta_dto,
|
||||
kek_meta_extended, project_id)
|
||||
|
||||
def bind_kek_metadata(self, kek_meta_dto):
|
||||
return self._call_pkcs11(self._bind_kek_metadata, kek_meta_dto)
|
||||
|
||||
def generate_symmetric(self, generate_dto, kek_meta_dto, project_id):
|
||||
return self._call_pkcs11(self._generate_symmetric, generate_dto,
|
||||
kek_meta_dto, project_id)
|
||||
|
||||
def generate_asymmetric(self, generate_dto, kek_meta_dto, project_id):
|
||||
raise NotImplementedError(u._("Feature not implemented for PKCS11"))
|
||||
|
||||
def supports(self, type_enum, algorithm=None, bit_length=None, mode=None):
|
||||
if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
def _call_pkcs11(self, func, *args, **kwargs):
|
||||
# Wrap pkcs11 calls to enable a single retry when exceptions are raised
|
||||
# that can be fixed by reinitializing the pkcs11 library
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except (exception.PKCS11Exception) as pe:
|
||||
LOG.warn("Reinitializing PKCS#11 library: {e}".format(e=pe))
|
||||
self._reinitialize_pkcs11()
|
||||
return func(*args, **kwargs)
|
||||
|
||||
def _encrypt(self, encrypt_dto, kek_meta_dto, project_id):
|
||||
kek = self._load_kek_from_meta_dto(kek_meta_dto)
|
||||
try:
|
||||
session = self._get_session()
|
||||
|
@ -127,8 +153,8 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
)
|
||||
return plugin.ResponseDTO(ct_data['ct'], kek_meta_extended)
|
||||
|
||||
def decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
|
||||
project_id):
|
||||
def _decrypt(self, decrypt_dto, kek_meta_dto, kek_meta_extended,
|
||||
project_id):
|
||||
kek = self._load_kek_from_meta_dto(kek_meta_dto)
|
||||
meta_extended = json.loads(kek_meta_extended)
|
||||
iv = base64.b64decode(meta_extended['iv'])
|
||||
|
@ -144,7 +170,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
|
||||
return pt_data
|
||||
|
||||
def bind_kek_metadata(self, kek_meta_dto):
|
||||
def _bind_kek_metadata(self, kek_meta_dto):
|
||||
if not kek_meta_dto.plugin_meta:
|
||||
# Generate wrapped kek and jsonify
|
||||
wkek = self._generate_wrapped_kek(
|
||||
|
@ -158,7 +184,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
kek_meta_dto.mode = 'CBC'
|
||||
return kek_meta_dto
|
||||
|
||||
def generate_symmetric(self, generate_dto, kek_meta_dto, project_id):
|
||||
def _generate_symmetric(self, generate_dto, kek_meta_dto, project_id):
|
||||
kek = self._load_kek_from_meta_dto(kek_meta_dto)
|
||||
byte_length = int(generate_dto.bit_length) // 8
|
||||
|
||||
|
@ -175,18 +201,22 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
)
|
||||
return plugin.ResponseDTO(ct_data['ct'], kek_meta_extended)
|
||||
|
||||
def generate_asymmetric(self, generate_dto, kek_meta_dto, project_id):
|
||||
raise NotImplementedError(u._("Feature not implemented for PKCS11"))
|
||||
def _configure_object_cache(self):
|
||||
# Master Key cache
|
||||
self.mk_cache = {}
|
||||
self.mk_cache_lock = threading.RLock()
|
||||
|
||||
def supports(self, type_enum, algorithm=None, bit_length=None, mode=None):
|
||||
if type_enum == plugin.PluginSupportTypes.ENCRYPT_DECRYPT:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
|
||||
return True
|
||||
elif type_enum == plugin.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
# Project KEK cache
|
||||
self.pkek_cache = collections.OrderedDict()
|
||||
self.pkek_cache_lock = threading.RLock()
|
||||
|
||||
# Session for object caching
|
||||
self.caching_session = self._get_session()
|
||||
self.caching_session_lock = threading.RLock()
|
||||
|
||||
# Cache master keys
|
||||
self._get_master_key(self.mkek_label)
|
||||
self._get_master_key(self.hmac_label)
|
||||
|
||||
def _pkek_cache_add(self, kek, label):
|
||||
with self.pkek_cache_lock:
|
||||
|
@ -227,7 +257,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
else:
|
||||
break
|
||||
|
||||
def _create_pkcs11(self, plugin_conf, ffi):
|
||||
def _create_pkcs11(self, plugin_conf, ffi=None):
|
||||
return pkcs11.PKCS11(
|
||||
library_path=plugin_conf.library_path,
|
||||
login_passphrase=plugin_conf.login,
|
||||
|
@ -237,6 +267,22 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
algorithm=plugin_conf.algorithm
|
||||
)
|
||||
|
||||
def _reinitialize_pkcs11(self):
|
||||
self.pkcs11.finalize()
|
||||
self.pkcs11 = None
|
||||
|
||||
with self.caching_session_lock:
|
||||
self.caching_session = None
|
||||
|
||||
with self.pkek_cache_lock:
|
||||
self.pkek_cache.clear()
|
||||
|
||||
with self.mk_cache_lock:
|
||||
self.mk_cache.clear()
|
||||
|
||||
self.pkcs11 = self._create_pkcs11(self.conf.p11_crypto_plugin)
|
||||
self._configure_object_cache()
|
||||
|
||||
def _get_session(self):
|
||||
return self.pkcs11.get_session()
|
||||
|
||||
|
@ -251,7 +297,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
with self.caching_session_lock:
|
||||
key = self.pkcs11.get_key_handle(label, session)
|
||||
if key is None:
|
||||
raise pkcs11.P11CryptoKeyHandleException(
|
||||
raise exception.P11CryptoKeyHandleException(
|
||||
u._("Could not find key labeled {0}").format(label)
|
||||
)
|
||||
self.mk_cache[label] = key
|
||||
|
@ -326,7 +372,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
session = self.caching_session
|
||||
if key_label in self.mk_cache or \
|
||||
self.pkcs11.get_key_handle(key_label, session) is not None:
|
||||
raise pkcs11.P11CryptoPluginKeyException(
|
||||
raise exception.P11CryptoPluginKeyException(
|
||||
u._("A master key with that label already exists")
|
||||
)
|
||||
mk = self.pkcs11.generate_key(
|
||||
|
@ -341,7 +387,7 @@ class P11CryptoPlugin(plugin.CryptoPluginBase):
|
|||
session = self.caching_session
|
||||
if key_label in self.mk_cache or \
|
||||
self.pkcs11.get_key_handle(key_label, session) is not None:
|
||||
raise pkcs11.P11CryptoPluginKeyException(
|
||||
raise exception.P11CryptoPluginKeyException(
|
||||
u._("A master key with that label already exists")
|
||||
)
|
||||
mk = self.pkcs11.generate_key(
|
||||
|
|
|
@ -279,6 +279,7 @@ def build_ffi():
|
|||
# FUNCTIONS
|
||||
ffi.cdef(textwrap.dedent("""
|
||||
CK_RV C_Initialize(void *);
|
||||
CK_RV C_Finalize(void *);
|
||||
CK_RV C_OpenSession(CK_SLOT_ID, CK_FLAGS, void *, CK_NOTIFY,
|
||||
CK_SESSION_HANDLE *);
|
||||
CK_RV C_CloseSession(CK_SESSION_HANDLE);
|
||||
|
@ -320,18 +321,6 @@ def build_ffi():
|
|||
return ffi
|
||||
|
||||
|
||||
class P11CryptoPluginKeyException(exception.BarbicanException):
|
||||
message = u._("More than one key found for label")
|
||||
|
||||
|
||||
class P11CryptoPluginException(exception.BarbicanException):
|
||||
message = u._("General exception")
|
||||
|
||||
|
||||
class P11CryptoKeyHandleException(exception.BarbicanException):
|
||||
message = u._("No key handle was found")
|
||||
|
||||
|
||||
class PKCS11(object):
|
||||
def __init__(self, library_path, login_passphrase, rw_session, slot_id,
|
||||
ffi=None, algorithm='CKM_AES_GCM'):
|
||||
|
@ -394,7 +383,7 @@ class PKCS11(object):
|
|||
rv = self.lib.C_FindObjectsFinal(session)
|
||||
self._check_error(rv)
|
||||
if count[0] > 1:
|
||||
raise P11CryptoPluginKeyException()
|
||||
raise exception.P11CryptoPluginKeyException()
|
||||
return key
|
||||
|
||||
def encrypt(self, key, pt_data, session):
|
||||
|
@ -447,7 +436,7 @@ class PKCS11(object):
|
|||
def generate_key(self, key_length, session, key_label=None,
|
||||
encrypt=False, sign=False, wrap=False, master_key=False):
|
||||
if not encrypt and not sign and not wrap:
|
||||
raise P11CryptoPluginException()
|
||||
raise exception.P11CryptoPluginException()
|
||||
if master_key and not key_label:
|
||||
raise ValueError(u._("key_label must be set for master_keys"))
|
||||
|
||||
|
@ -566,14 +555,20 @@ class PKCS11(object):
|
|||
rv = self.lib.C_DestroyObject(session, obj_handle)
|
||||
self._check_error(rv)
|
||||
|
||||
def finalize(self):
|
||||
rv = self.lib.C_Finalize(self.ffi.NULL)
|
||||
self._check_error(rv)
|
||||
|
||||
def _check_error(self, value):
|
||||
if value != CKR_OK:
|
||||
# TODO(jkf) Expand error handling to raise different exceptions
|
||||
# for notable errors we want to handle programmatically
|
||||
raise P11CryptoPluginException(u._(
|
||||
"HSM returned response code: {hex_value} {code}").format(
|
||||
hex_value=hex(value),
|
||||
code=ERROR_CODES.get(value, 'CKR_????')))
|
||||
code = ERROR_CODES.get(value, 'CKR_????')
|
||||
hex_code = "{hex} {code}".format(hex=hex(value), code=code)
|
||||
|
||||
if code == 'CKR_TOKEN_NOT_PRESENT':
|
||||
raise exception.P11CryptoTokenException(slot_id=self.slot_id)
|
||||
|
||||
raise exception.P11CryptoPluginException(u._(
|
||||
"HSM returned response code: {code}").format(code=hex_code))
|
||||
|
||||
def _generate_random(self, length, session):
|
||||
buf = self.ffi.new("CK_BYTE[{0}]".format(length))
|
||||
|
@ -635,7 +630,7 @@ class PKCS11(object):
|
|||
def _rng_self_test(self, session):
|
||||
test_random = self.generate_random(100, session)
|
||||
if test_random == b'\x00' * 100:
|
||||
raise P11CryptoPluginException(
|
||||
raise exception.P11CryptoPluginException(
|
||||
u._("Apparent RNG self-test failure."))
|
||||
|
||||
def _build_gcm_mechanism(self, iv):
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
import mock
|
||||
import six
|
||||
|
||||
from barbican.common import exception as ex
|
||||
from barbican.model import models
|
||||
from barbican.plugin.crypto import crypto as plugin_import
|
||||
from barbican.plugin.crypto import p11_crypto
|
||||
|
@ -48,6 +49,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
self.pkcs11.compute_hmac.return_value = b'1'
|
||||
self.pkcs11.verify_hmac.return_value = None
|
||||
self.pkcs11.destroy_object.return_value = None
|
||||
self.pkcs11.finalize.return_value = None
|
||||
|
||||
self.cfg_mock = mock.MagicMock(name='config mock')
|
||||
self.cfg_mock.p11_crypto_plugin.mkek_label = 'mkek_label'
|
||||
|
@ -120,7 +122,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
|
||||
def test_encrypt_bad_session(self):
|
||||
self.pkcs11.get_session.return_value = mock.DEFAULT
|
||||
self.pkcs11.get_session.side_effect = pkcs11.P11CryptoPluginException(
|
||||
self.pkcs11.get_session.side_effect = ex.P11CryptoPluginException(
|
||||
'Testing error handling'
|
||||
)
|
||||
payload = b'test payload'
|
||||
|
@ -132,8 +134,8 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
'"wrapped_key": "wrappedkey==",'
|
||||
'"mkek_label": "mkek_label",'
|
||||
'"hmac_label": "hmac_label"}')
|
||||
self.assertRaises(pkcs11.P11CryptoPluginException,
|
||||
self.plugin.encrypt,
|
||||
self.assertRaises(ex.P11CryptoPluginException,
|
||||
self.plugin._encrypt,
|
||||
encrypt_dto,
|
||||
kek_meta,
|
||||
mock.MagicMock())
|
||||
|
@ -172,7 +174,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
|
||||
def test_decrypt_bad_session(self):
|
||||
self.pkcs11.get_session.return_value = mock.DEFAULT
|
||||
self.pkcs11.get_session.side_effect = pkcs11.P11CryptoPluginException(
|
||||
self.pkcs11.get_session.side_effect = ex.P11CryptoPluginException(
|
||||
'Testing error handling'
|
||||
)
|
||||
ct = b'ctct'
|
||||
|
@ -185,8 +187,8 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
'"wrapped_key": "wrappedkey==",'
|
||||
'"mkek_label": "mkek_label",'
|
||||
'"hmac_label": "hmac_label"}')
|
||||
self.assertRaises(pkcs11.P11CryptoPluginException,
|
||||
self.plugin.decrypt,
|
||||
self.assertRaises(ex.P11CryptoPluginException,
|
||||
self.plugin._decrypt,
|
||||
decrypt_dto,
|
||||
kek_meta,
|
||||
kek_meta_extended,
|
||||
|
@ -264,7 +266,7 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
|
||||
def test_missing_mkek(self):
|
||||
self.pkcs11.get_key_handle.return_value = None
|
||||
self.assertRaises(pkcs11.P11CryptoKeyHandleException,
|
||||
self.assertRaises(ex.P11CryptoKeyHandleException,
|
||||
self.plugin._get_master_key,
|
||||
'bad_key_label')
|
||||
|
||||
|
@ -282,12 +284,12 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
self.assertEqual(self.pkcs11.generate_key.call_count, 1)
|
||||
|
||||
def test_cached_generate_mkek(self):
|
||||
self.assertRaises(pkcs11.P11CryptoPluginKeyException,
|
||||
self.assertRaises(ex.P11CryptoPluginKeyException,
|
||||
self.plugin._generate_mkek, 256, 'mkek_label')
|
||||
self.assertEqual(self.pkcs11.get_key_handle.call_count, 2)
|
||||
|
||||
def test_existing_generate_mkek(self):
|
||||
self.assertRaises(pkcs11.P11CryptoPluginKeyException,
|
||||
self.assertRaises(ex.P11CryptoPluginKeyException,
|
||||
self.plugin._generate_mkek, 256, 'mkek2_label')
|
||||
self.assertEqual(self.pkcs11.get_key_handle.call_count, 3)
|
||||
|
||||
|
@ -301,12 +303,12 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
self.assertEqual(self.pkcs11.generate_key.call_count, 1)
|
||||
|
||||
def test_cached_generate_mkhk(self):
|
||||
self.assertRaises(pkcs11.P11CryptoPluginKeyException,
|
||||
self.assertRaises(ex.P11CryptoPluginKeyException,
|
||||
self.plugin._generate_mkhk, 256, 'hmac_label')
|
||||
self.assertEqual(self.pkcs11.get_key_handle.call_count, 2)
|
||||
|
||||
def test_existing_generate_mkhk(self):
|
||||
self.assertRaises(pkcs11.P11CryptoPluginKeyException,
|
||||
self.assertRaises(ex.P11CryptoPluginKeyException,
|
||||
self.plugin._generate_mkhk, 256, 'mkhk2_label')
|
||||
self.assertEqual(self.pkcs11.get_key_handle.call_count, 3)
|
||||
|
||||
|
@ -326,3 +328,33 @@ class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
|
|||
|
||||
p11 = self.plugin._create_pkcs11(self.cfg_mock.p11_crypto_plugin, ffi)
|
||||
self.assertIsInstance(p11, pkcs11.PKCS11)
|
||||
|
||||
def test_call_pkcs11_with_token_error(self):
|
||||
self.plugin._encrypt = mock.Mock()
|
||||
self.plugin._encrypt.side_effect = [ex.P11CryptoTokenException(
|
||||
'Testing error handling'
|
||||
),
|
||||
'test payload']
|
||||
self.plugin._reinitialize_pkcs11 = mock.Mock()
|
||||
self.plugin._reinitialize_pkcs11.return_value = mock.DEFAULT
|
||||
|
||||
self.plugin.encrypt(mock.MagicMock(), mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
|
||||
self.assertEqual(self.pkcs11.get_key_handle.call_count, 2)
|
||||
self.assertEqual(self.pkcs11.get_session.call_count, 1)
|
||||
self.assertEqual(self.pkcs11.return_session.call_count, 0)
|
||||
self.assertEqual(self.plugin._encrypt.call_count, 2)
|
||||
|
||||
def test_reinitialize_pkcs11(self):
|
||||
pkcs11 = self.pkcs11
|
||||
self.plugin._create_pkcs11 = mock.Mock()
|
||||
self.plugin._create_pkcs11.return_value = pkcs11
|
||||
self.plugin._configure_object_cache = mock.Mock()
|
||||
self.plugin._configure_object_cache.return_value = mock.DEFAULT
|
||||
|
||||
self.plugin._reinitialize_pkcs11()
|
||||
|
||||
self.assertEqual(self.pkcs11.finalize.call_count, 1)
|
||||
self.assertEqual(self.plugin._create_pkcs11.call_count, 1)
|
||||
self.assertEqual(self.plugin._configure_object_cache.call_count, 1)
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
import mock
|
||||
import six
|
||||
|
||||
from barbican.common import exception
|
||||
from barbican.plugin.crypto import pkcs11
|
||||
from barbican.tests import utils
|
||||
|
||||
|
@ -28,6 +29,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
|||
|
||||
self.lib = mock.Mock()
|
||||
self.lib.C_Initialize.return_value = pkcs11.CKR_OK
|
||||
self.lib.C_Finalize.return_value = pkcs11.CKR_OK
|
||||
self.lib.C_OpenSession.side_effect = self._open_session
|
||||
self.lib.C_CloseSession.return_value = pkcs11.CKR_OK
|
||||
self.lib.C_GetSessionInfo.side_effect = self._get_session_user
|
||||
|
@ -165,7 +167,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
|||
self.ffi.buffer(buf)[:] = b'\x00' * length
|
||||
return pkcs11.CKR_OK
|
||||
self.lib.C_GenerateRandom.side_effect = _bad_generate_random
|
||||
self.assertRaises(pkcs11.P11CryptoPluginException,
|
||||
self.assertRaises(exception.P11CryptoPluginException,
|
||||
self.pkcs11._rng_self_test, mock.MagicMock())
|
||||
|
||||
def test_get_key_handle_one_key(self):
|
||||
|
@ -190,7 +192,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
|||
def test_get_key_handle_multiple_keys(self):
|
||||
self.lib.C_FindObjects.side_effect = self._find_objects_two
|
||||
|
||||
self.assertRaises(pkcs11.P11CryptoPluginKeyException,
|
||||
self.assertRaises(exception.P11CryptoPluginKeyException,
|
||||
self.pkcs11.get_key_handle, 'foo', mock.MagicMock())
|
||||
|
||||
self.assertEqual(self.lib.C_FindObjectsInit.call_count, 1)
|
||||
|
@ -213,7 +215,7 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
|||
self.assertEqual(self.lib.C_GenerateKey.call_count, 1)
|
||||
|
||||
def test_generate_key_no_flags(self):
|
||||
self.assertRaises(pkcs11.P11CryptoPluginException,
|
||||
self.assertRaises(exception.P11CryptoPluginException,
|
||||
self.pkcs11.generate_key, mock.MagicMock(),
|
||||
mock.MagicMock())
|
||||
|
||||
|
@ -337,3 +339,19 @@ class WhenTestingPKCS11(utils.BaseTestCase):
|
|||
def test_invalid_build_attributes(self):
|
||||
self.assertRaises(TypeError, self.pkcs11._build_attributes,
|
||||
[pkcs11.Attribute(pkcs11.CKA_CLASS, {})])
|
||||
|
||||
def test_finalize(self):
|
||||
self.pkcs11.finalize()
|
||||
|
||||
self.assertEqual(self.lib.C_Finalize.call_count, 1)
|
||||
|
||||
def test_check_error(self):
|
||||
self.assertIsNone(self.pkcs11._check_error(pkcs11.CKR_OK))
|
||||
|
||||
def test_check_error_with_without_specific_handling(self):
|
||||
self.assertRaises(exception.P11CryptoPluginException,
|
||||
self.pkcs11._check_error, 5)
|
||||
|
||||
def test_check_error_with_token_error(self):
|
||||
self.assertRaises(exception.P11CryptoTokenException,
|
||||
self.pkcs11._check_error, 0xe0)
|
||||
|
|
Loading…
Reference in New Issue