Update the KMIPSecretStore tests to not test PyKMIP internals
The KMIPSecretStore unit test suite tests the internals of the PyKMIP ProxyKmipClient. An upcoming PyKMIP release modifies these internals, breaking the Barbican unit tests. This change updates the unit tests to only test the ProxyKmipClient interface, making sure that the KMIPSecretStore still functions correctly while avoiding dependencies on PyKMIP internals. Closes-Bug: #1522856 Change-Id: I3b38c6cc24a70cf13dbf42e0fd25d183249c91dd
This commit is contained in:
parent
ea725053c0
commit
a27d201c05
@ -13,7 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import base64
|
||||
import socket
|
||||
import ssl
|
||||
import stat
|
||||
|
||||
@ -23,87 +22,64 @@ from barbican.plugin.interface import secret_store
|
||||
from barbican.tests import keys
|
||||
from barbican.tests import utils
|
||||
|
||||
from kmip.core import attributes as attr
|
||||
from kmip.core import enums
|
||||
from kmip.core.factories import attributes
|
||||
from kmip.core.messages import contents
|
||||
from kmip.core import misc
|
||||
from kmip.core import objects
|
||||
from kmip.core import secrets
|
||||
from kmip.core.secrets import OpaqueObject as Opaque
|
||||
from kmip.services import kmip_client as proxy
|
||||
from kmip.services import results
|
||||
from kmip.pie import client
|
||||
from kmip.pie import objects
|
||||
|
||||
from barbican.plugin import kmip_secret_store as kss
|
||||
from barbican.plugin.util import translations
|
||||
|
||||
|
||||
def get_sample_opaque_secret():
|
||||
opaque_type = Opaque.OpaqueDataType(enums.OpaqueDataType.NONE)
|
||||
opaque_value = Opaque.OpaqueDataValue(base64.b64decode(
|
||||
utils.get_symmetric_key()))
|
||||
return Opaque(opaque_type, opaque_value)
|
||||
return objects.OpaqueObject(
|
||||
base64.b64decode(utils.get_symmetric_key()),
|
||||
enums.OpaqueDataType.NONE)
|
||||
|
||||
|
||||
def get_sample_symmetric_key(key_b64=utils.get_symmetric_key(),
|
||||
key_length=128,
|
||||
algorithm=enums.CryptographicAlgorithm.AES):
|
||||
key_material = objects.KeyMaterial(base64.b64decode(key_b64))
|
||||
key_value = objects.KeyValue(key_material)
|
||||
key_block = objects.KeyBlock(
|
||||
key_format_type=misc.KeyFormatType(enums.KeyFormatType.RAW),
|
||||
key_compression_type=None,
|
||||
key_value=key_value,
|
||||
cryptographic_algorithm=attr.CryptographicAlgorithm(algorithm),
|
||||
cryptographic_length=attr.CryptographicLength(key_length),
|
||||
key_wrapping_data=None)
|
||||
return secrets.SymmetricKey(key_block)
|
||||
return objects.SymmetricKey(
|
||||
algorithm,
|
||||
key_length,
|
||||
base64.b64decode(key_b64))
|
||||
|
||||
|
||||
def get_sample_public_key(pkcs1=False):
|
||||
if pkcs1:
|
||||
public_key = kss.get_public_key_der_pkcs1(keys.get_public_key_pem())
|
||||
key_format_type = misc.KeyFormatType(enums.KeyFormatType.PKCS_1)
|
||||
public_key_value = kss.get_public_key_der_pkcs1(
|
||||
keys.get_public_key_pem())
|
||||
key_format_type = enums.KeyFormatType.PKCS_1
|
||||
else:
|
||||
public_key = keys.get_public_key_der()
|
||||
key_format_type = misc.KeyFormatType(enums.KeyFormatType.X_509)
|
||||
public_key_value = keys.get_public_key_der()
|
||||
key_format_type = enums.KeyFormatType.X_509
|
||||
|
||||
key_material = objects.KeyMaterial(public_key)
|
||||
key_value = objects.KeyValue(key_material)
|
||||
key_block = objects.KeyBlock(
|
||||
key_format_type=key_format_type,
|
||||
key_compression_type=None,
|
||||
key_value=key_value,
|
||||
cryptographic_algorithm=attr.CryptographicAlgorithm(
|
||||
enums.CryptographicAlgorithm.RSA),
|
||||
cryptographic_length=attr.CryptographicLength(2048),
|
||||
key_wrapping_data=None)
|
||||
return secrets.PublicKey(key_block)
|
||||
return objects.PublicKey(
|
||||
enums.CryptographicAlgorithm.RSA,
|
||||
2048,
|
||||
public_key_value,
|
||||
key_format_type)
|
||||
|
||||
|
||||
def get_sample_private_key(pkcs1=False):
|
||||
if pkcs1:
|
||||
private_key = kss.get_private_key_der_pkcs1(keys.get_private_key_pem())
|
||||
key_format_type = misc.KeyFormatType(enums.KeyFormatType.PKCS_1)
|
||||
private_key_value = kss.get_private_key_der_pkcs1(
|
||||
keys.get_private_key_pem())
|
||||
key_format_type = enums.KeyFormatType.PKCS_1
|
||||
else:
|
||||
private_key = keys.get_private_key_der()
|
||||
key_format_type = misc.KeyFormatType(enums.KeyFormatType.PKCS_8)
|
||||
key_material = objects.KeyMaterial(private_key)
|
||||
key_value = objects.KeyValue(key_material)
|
||||
key_block = objects.KeyBlock(
|
||||
key_format_type=key_format_type,
|
||||
key_compression_type=None,
|
||||
key_value=key_value,
|
||||
cryptographic_algorithm=attr.CryptographicAlgorithm(
|
||||
enums.CryptographicAlgorithm.RSA),
|
||||
cryptographic_length=attr.CryptographicLength(2048),
|
||||
key_wrapping_data=None)
|
||||
return secrets.PrivateKey(key_block)
|
||||
private_key_value = keys.get_private_key_der()
|
||||
key_format_type = enums.KeyFormatType.PKCS_8
|
||||
|
||||
return objects.PrivateKey(
|
||||
enums.CryptographicAlgorithm.RSA,
|
||||
2048,
|
||||
private_key_value,
|
||||
key_format_type)
|
||||
|
||||
|
||||
def get_sample_certificate():
|
||||
return secrets.Certificate(
|
||||
certificate_type=enums.CertificateTypeEnum.X_509,
|
||||
certificate_value=keys.get_certificate_der())
|
||||
return objects.X509Certificate(
|
||||
keys.get_certificate_der())
|
||||
|
||||
|
||||
@utils.parameterized_test_case
|
||||
@ -141,40 +117,25 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
|
||||
self.sample_secret = get_sample_symmetric_key()
|
||||
|
||||
self.secret_store.client.proxy.open = mock.MagicMock(
|
||||
proxy.KMIPProxy().open)
|
||||
self.secret_store.client.proxy.close = mock.MagicMock(
|
||||
proxy.KMIPProxy().close)
|
||||
self.secret_store.client.open = mock.MagicMock(
|
||||
spec=client.ProxyKmipClient.open)
|
||||
self.secret_store.client.close = mock.MagicMock(
|
||||
spec=client.ProxyKmipClient.close)
|
||||
|
||||
self.secret_store.client.proxy.create = mock.MagicMock(
|
||||
proxy.KMIPProxy().create, return_value=results.CreateResult(
|
||||
contents.ResultStatus(enums.ResultStatus.SUCCESS),
|
||||
uuid=attr.UniqueIdentifier(
|
||||
self.symmetric_key_uuid)))
|
||||
self.secret_store.client.create = mock.MagicMock(
|
||||
return_value=self.symmetric_key_uuid)
|
||||
|
||||
self.secret_store.client.proxy.create_key_pair = mock.MagicMock(
|
||||
proxy.KMIPProxy().create_key_pair,
|
||||
return_value=results.CreateKeyPairResult(
|
||||
contents.ResultStatus(enums.ResultStatus.SUCCESS),
|
||||
private_key_uuid=attr.UniqueIdentifier(self.private_key_uuid),
|
||||
public_key_uuid=attr.UniqueIdentifier(self.public_key_uuid)))
|
||||
self.secret_store.client.create_key_pair = mock.MagicMock(
|
||||
return_value=(self.public_key_uuid, self.private_key_uuid))
|
||||
|
||||
self.secret_store.client.proxy.register = mock.MagicMock(
|
||||
proxy.KMIPProxy().register, return_value=results.RegisterResult(
|
||||
contents.ResultStatus(enums.ResultStatus.SUCCESS),
|
||||
uuid=attr.UniqueIdentifier('uuid')))
|
||||
self.secret_store.client.register = mock.MagicMock(
|
||||
return_value='uuid')
|
||||
|
||||
self.secret_store.client.proxy.destroy = mock.MagicMock(
|
||||
proxy.KMIPProxy().destroy, return_value=results.DestroyResult(
|
||||
contents.ResultStatus(enums.ResultStatus.SUCCESS)))
|
||||
self.secret_store.client.destroy = mock.MagicMock(
|
||||
return_value=None)
|
||||
|
||||
self.secret_store.client.proxy.get = mock.MagicMock(
|
||||
proxy.KMIPProxy().get, return_value=results.GetResult(
|
||||
contents.ResultStatus(enums.ResultStatus.SUCCESS),
|
||||
object_type=attr.ObjectType(enums.ObjectType.SYMMETRIC_KEY),
|
||||
secret=self.sample_secret))
|
||||
|
||||
self.attribute_factory = attributes.AttributeFactory()
|
||||
self.secret_store.client.get = mock.MagicMock(
|
||||
return_value=self.sample_secret)
|
||||
|
||||
# --------------- TEST CONFIG OPTIONS ---------------------------------
|
||||
|
||||
@ -244,9 +205,9 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
128, 'mode')
|
||||
self.secret_store.generate_symmetric_key(key_spec)
|
||||
|
||||
self.secret_store.client.proxy.create.assert_called_once_with(
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
mock.ANY)
|
||||
self.secret_store.client.create.assert_called_once_with(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
128)
|
||||
|
||||
def test_generate_symmetric_key_return_value(self):
|
||||
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
|
||||
@ -258,9 +219,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
self.assertEqual(expected, return_value)
|
||||
|
||||
def test_generate_symmetric_key_server_error_occurs(self):
|
||||
self.secret_store.client.proxy.create = mock.MagicMock(
|
||||
proxy.KMIPProxy().create, return_value=results.CreateResult(
|
||||
contents.ResultStatus(enums.ResultStatus.OPERATION_FAILED)))
|
||||
self.secret_store.client.create.side_effect = Exception
|
||||
|
||||
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
|
||||
128, 'mode')
|
||||
@ -294,8 +253,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
key_spec)
|
||||
|
||||
def test_generate_symmetric_key_error_opening_connection(self):
|
||||
self.secret_store.client.proxy.open = mock.Mock(
|
||||
side_effect=socket.error)
|
||||
self.secret_store.client.open.side_effect = Exception
|
||||
|
||||
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
|
||||
128, 'mode')
|
||||
@ -311,8 +269,9 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
2048, 'mode')
|
||||
self.secret_store.generate_asymmetric_key(key_spec)
|
||||
|
||||
self.secret_store.client.proxy.create_key_pair.assert_called_once_with(
|
||||
common_template_attribute=mock.ANY)
|
||||
self.secret_store.client.create_key_pair.assert_called_once_with(
|
||||
enums.CryptographicAlgorithm.RSA,
|
||||
2048)
|
||||
|
||||
def test_generate_asymmetric_key_return_value(self):
|
||||
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA,
|
||||
@ -334,10 +293,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
expected_passphrase_meta, return_value.passphrase_meta)
|
||||
|
||||
def test_generate_asymmetric_key_server_error_occurs(self):
|
||||
self.secret_store.client.proxy.create_key_pair = mock.MagicMock(
|
||||
proxy.KMIPProxy().create_key_pair,
|
||||
return_value=results.CreateKeyPairResult(
|
||||
contents.ResultStatus(enums.ResultStatus.OPERATION_FAILED)))
|
||||
self.secret_store.client.create_key_pair.side_effect = Exception
|
||||
|
||||
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA,
|
||||
2048, 'mode')
|
||||
@ -378,8 +334,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
key_spec)
|
||||
|
||||
def test_generate_asymmetric_key_error_opening_connection(self):
|
||||
self.secret_store.client.proxy.open = mock.Mock(
|
||||
side_effect=socket.error)
|
||||
self.secret_store.client.open.side_effect = Exception
|
||||
|
||||
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA,
|
||||
2048, 'mode')
|
||||
@ -400,22 +355,11 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
'content_type',
|
||||
transport_key=None)
|
||||
self.secret_store.store_secret(secret_dto)
|
||||
self.secret_store.client.proxy.register.assert_called_once_with(
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
mock.ANY,
|
||||
mock.ANY)
|
||||
register_mock = self.secret_store.client.proxy.register
|
||||
register_call_args, _ = register_mock.call_args
|
||||
actual_secret = register_call_args[2]
|
||||
self.assertEqual(
|
||||
128,
|
||||
actual_secret.key_block.cryptographic_length.value)
|
||||
self.assertEqual(
|
||||
attr.CryptographicAlgorithm(enums.CryptographicAlgorithm.AES),
|
||||
actual_secret.key_block.cryptographic_algorithm)
|
||||
self.assertEqual(
|
||||
base64.b64decode(sym_key),
|
||||
actual_secret.key_block.key_value.key_material.value)
|
||||
self.secret_store.client.register.assert_called_once_with(
|
||||
objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
128,
|
||||
base64.b64decode(utils.get_symmetric_key())))
|
||||
|
||||
def test_store_symmetric_secret_return_value(self):
|
||||
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
|
||||
@ -433,25 +377,17 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
|
||||
def test_store_passphrase_secret_assert_called(self):
|
||||
key_spec = secret_store.KeySpec(None, None, None)
|
||||
passphrase = b"supersecretpassphrase"
|
||||
passphrase = base64.b64encode(b"supersecretpassphrase")
|
||||
secret_dto = secret_store.SecretDTO(secret_store.SecretType.PASSPHRASE,
|
||||
base64.b64encode(passphrase),
|
||||
passphrase,
|
||||
key_spec,
|
||||
'content_type',
|
||||
transport_key=None)
|
||||
self.secret_store.store_secret(secret_dto)
|
||||
self.secret_store.client.proxy.register.assert_called_once_with(
|
||||
enums.ObjectType.SECRET_DATA,
|
||||
mock.ANY,
|
||||
mock.ANY)
|
||||
proxy = self.secret_store.client.proxy
|
||||
register_call_args, _ = proxy.register.call_args
|
||||
actual_secret = register_call_args[2]
|
||||
self.assertIsNone(actual_secret.key_block.cryptographic_length)
|
||||
self.assertIsNone(actual_secret.key_block.cryptographic_algorithm)
|
||||
self.assertEqual(
|
||||
passphrase,
|
||||
actual_secret.key_block.key_value.key_material.value)
|
||||
self.secret_store.client.register.assert_called_once_with(
|
||||
objects.SecretData(
|
||||
base64.b64decode(passphrase),
|
||||
enums.SecretDataType.PASSWORD))
|
||||
|
||||
def test_store_passphrase_secret_return_value(self):
|
||||
key_spec = secret_store.KeySpec(None, None, None)
|
||||
@ -468,26 +404,17 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
|
||||
def test_store_opaque_secret_assert_called(self):
|
||||
key_spec = secret_store.KeySpec(None, None, None)
|
||||
opaque = b'\x00\x01\x02\x03\x04\x05\x06\x07'
|
||||
opaque = base64.b64encode(b'\x00\x01\x02\x03\x04\x05\x06\x07')
|
||||
secret_dto = secret_store.SecretDTO(secret_store.SecretType.OPAQUE,
|
||||
base64.b64encode(opaque),
|
||||
opaque,
|
||||
key_spec,
|
||||
'content_type',
|
||||
transport_key=None)
|
||||
self.secret_store.store_secret(secret_dto)
|
||||
self.secret_store.client.proxy.register.assert_called_once_with(
|
||||
enums.ObjectType.OPAQUE_DATA,
|
||||
mock.ANY,
|
||||
mock.ANY)
|
||||
proxy = self.secret_store.client.proxy
|
||||
register_call_args, _ = proxy.register.call_args
|
||||
actual_secret = register_call_args[2]
|
||||
self.assertEqual(
|
||||
Opaque.OpaqueDataType(enums.OpaqueDataType.NONE),
|
||||
actual_secret.opaque_data_type)
|
||||
self.assertEqual(
|
||||
Opaque.OpaqueDataValue(opaque),
|
||||
actual_secret.opaque_data_value)
|
||||
self.secret_store.client.register.assert_called_once_with(
|
||||
objects.OpaqueObject(
|
||||
base64.b64decode(opaque),
|
||||
enums.OpaqueDataType.NONE))
|
||||
|
||||
def test_store_opaque_secret_return_value(self):
|
||||
key_spec = secret_store.KeySpec(None, None, None)
|
||||
@ -533,28 +460,36 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
kmip_key,
|
||||
pkcs1_only):
|
||||
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 2048)
|
||||
secret_value = base64.b64encode(barbican_key)
|
||||
secret_dto = secret_store.SecretDTO(barbican_type,
|
||||
base64.b64encode(barbican_key),
|
||||
secret_value,
|
||||
key_spec,
|
||||
'content_type')
|
||||
self.secret_store.pkcs1_only = pkcs1_only
|
||||
self.secret_store.store_secret(secret_dto)
|
||||
self.secret_store.client.proxy.register.assert_called_once_with(
|
||||
kmip_type,
|
||||
mock.ANY,
|
||||
mock.ANY)
|
||||
proxy = self.secret_store.client.proxy
|
||||
register_call_args, _ = proxy.register.call_args
|
||||
actual_secret = register_call_args[2]
|
||||
self.assertEqual(
|
||||
2048,
|
||||
actual_secret.key_block.cryptographic_length.value)
|
||||
self.assertEqual(
|
||||
attr.CryptographicAlgorithm(enums.CryptographicAlgorithm.RSA),
|
||||
actual_secret.key_block.cryptographic_algorithm)
|
||||
self.assertEqual(
|
||||
kmip_key,
|
||||
actual_secret.key_block.key_value.key_material.value)
|
||||
secret_value = base64.b64decode(secret_value)
|
||||
if not pkcs1_only:
|
||||
secret_value = translations.convert_pem_to_der(
|
||||
secret_value,
|
||||
barbican_type)
|
||||
if kmip_type == enums.ObjectType.PUBLIC_KEY:
|
||||
if pkcs1_only:
|
||||
secret_value = kss.get_public_key_der_pkcs1(secret_value)
|
||||
secret = objects.PublicKey(
|
||||
enums.CryptographicAlgorithm.RSA,
|
||||
2048,
|
||||
secret_value,
|
||||
enums.KeyFormatType.X_509)
|
||||
else:
|
||||
if pkcs1_only:
|
||||
secret_value = kss.get_private_key_der_pkcs1(secret_value)
|
||||
secret = objects.PrivateKey(
|
||||
enums.CryptographicAlgorithm.RSA,
|
||||
2048,
|
||||
secret_value,
|
||||
enums.KeyFormatType.PKCS_8)
|
||||
|
||||
self.secret_store.client.register.assert_called_once_with(secret)
|
||||
|
||||
@utils.parameterized_dataset({
|
||||
'private_pkcs8': [secret_store.SecretType.PRIVATE,
|
||||
@ -592,22 +527,17 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
def test_store_certificate_secret_assert_called(
|
||||
self, algorithm, bit_length):
|
||||
key_spec = secret_store.KeySpec(algorithm, bit_length)
|
||||
certificate_value = base64.b64encode(keys.get_certificate_pem())
|
||||
secret_dto = secret_store.SecretDTO(
|
||||
secret_store.SecretType.CERTIFICATE,
|
||||
base64.b64encode(keys.get_certificate_pem()),
|
||||
certificate_value,
|
||||
key_spec,
|
||||
'content_type')
|
||||
self.secret_store.store_secret(secret_dto)
|
||||
self.secret_store.client.proxy.register.assert_called_once_with(
|
||||
enums.ObjectType.CERTIFICATE,
|
||||
mock.ANY,
|
||||
mock.ANY)
|
||||
proxy = self.secret_store.client.proxy
|
||||
register_call_args, _ = proxy.register.call_args
|
||||
actual_secret = register_call_args[2]
|
||||
self.assertEqual(
|
||||
keys.get_certificate_der(),
|
||||
actual_secret.certificate_value.value)
|
||||
self.secret_store.client.register.assert_called_once_with(
|
||||
objects.X509Certificate(translations.convert_pem_to_der(
|
||||
base64.b64decode(certificate_value),
|
||||
secret_store.SecretType.CERTIFICATE)))
|
||||
|
||||
def test_store_certificate_secret_return_value(self):
|
||||
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 2048)
|
||||
@ -622,9 +552,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
self.assertEqual(expected, return_value)
|
||||
|
||||
def test_store_secret_server_error_occurs(self):
|
||||
self.secret_store.client.proxy.register = mock.MagicMock(
|
||||
proxy.KMIPProxy().register, return_value=results.RegisterResult(
|
||||
contents.ResultStatus(enums.ResultStatus.OPERATION_FAILED)))
|
||||
self.secret_store.client.register.side_effect = Exception
|
||||
|
||||
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
|
||||
128, 'mode')
|
||||
@ -666,8 +594,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
secret_dto)
|
||||
|
||||
def test_store_secret_error_opening_connection(self):
|
||||
self.secret_store.client.proxy.open = mock.Mock(
|
||||
side_effect=socket.error)
|
||||
self.secret_store.client.open.side_effect = Exception
|
||||
|
||||
key_spec = secret_store.KeySpec(secret_store.KeyAlgorithm.AES,
|
||||
128, 'mode')
|
||||
@ -687,36 +614,26 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
@utils.parameterized_dataset({
|
||||
'symmetric': [get_sample_symmetric_key(),
|
||||
secret_store.SecretType.SYMMETRIC,
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
misc.KeyFormatType(enums.KeyFormatType.RAW),
|
||||
utils.get_symmetric_key(),
|
||||
False],
|
||||
'hmac_sha1': [get_sample_symmetric_key(
|
||||
algorithm=enums.CryptographicAlgorithm.HMAC_SHA1),
|
||||
secret_store.SecretType.SYMMETRIC,
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
misc.KeyFormatType(enums.KeyFormatType.RAW),
|
||||
utils.get_symmetric_key(),
|
||||
False],
|
||||
'hmac_sha256': [get_sample_symmetric_key(
|
||||
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256),
|
||||
secret_store.SecretType.SYMMETRIC,
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
misc.KeyFormatType(enums.KeyFormatType.RAW),
|
||||
utils.get_symmetric_key(),
|
||||
False],
|
||||
'hmac_sha384': [get_sample_symmetric_key(
|
||||
algorithm=enums.CryptographicAlgorithm.HMAC_SHA384),
|
||||
secret_store.SecretType.SYMMETRIC,
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
misc.KeyFormatType(enums.KeyFormatType.RAW),
|
||||
utils.get_symmetric_key(),
|
||||
False],
|
||||
'hmac_sha512': [get_sample_symmetric_key(
|
||||
algorithm=enums.CryptographicAlgorithm.HMAC_SHA512),
|
||||
secret_store.SecretType.SYMMETRIC,
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
misc.KeyFormatType(enums.KeyFormatType.RAW),
|
||||
utils.get_symmetric_key(),
|
||||
False],
|
||||
'triple_des': [get_sample_symmetric_key(
|
||||
@ -724,72 +641,50 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
key_length=192,
|
||||
algorithm=enums.CryptographicAlgorithm.TRIPLE_DES),
|
||||
secret_store.SecretType.SYMMETRIC,
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
misc.KeyFormatType(enums.KeyFormatType.RAW),
|
||||
utils.get_triple_des_key(),
|
||||
False],
|
||||
'opaque': [get_sample_opaque_secret(),
|
||||
secret_store.SecretType.OPAQUE,
|
||||
enums.ObjectType.OPAQUE_DATA,
|
||||
misc.KeyFormatType(enums.KeyFormatType.RAW),
|
||||
utils.get_symmetric_key(),
|
||||
False],
|
||||
'public_key': [get_sample_public_key(),
|
||||
secret_store.SecretType.PUBLIC,
|
||||
enums.ObjectType.PUBLIC_KEY,
|
||||
misc.KeyFormatType(enums.KeyFormatType.X_509),
|
||||
base64.b64encode(keys.get_public_key_pem()),
|
||||
False],
|
||||
'public_key_pkcs1': [get_sample_public_key(pkcs1=True),
|
||||
secret_store.SecretType.PUBLIC,
|
||||
enums.ObjectType.PUBLIC_KEY,
|
||||
misc.KeyFormatType(enums.KeyFormatType.PKCS_1),
|
||||
base64.b64encode(keys.get_public_key_pem()),
|
||||
True],
|
||||
'private_key': [get_sample_private_key(),
|
||||
secret_store.SecretType.PRIVATE,
|
||||
enums.ObjectType.PRIVATE_KEY,
|
||||
misc.KeyFormatType(enums.KeyFormatType.PKCS_8),
|
||||
base64.b64encode(keys.get_private_key_pem()),
|
||||
False],
|
||||
'private_key_pkcs1': [get_sample_private_key(pkcs1=True),
|
||||
secret_store.SecretType.PRIVATE,
|
||||
enums.ObjectType.PRIVATE_KEY,
|
||||
misc.KeyFormatType(enums.KeyFormatType.PKCS_1),
|
||||
base64.b64encode(keys.get_private_key_pem()),
|
||||
True],
|
||||
'certificate': [get_sample_certificate(),
|
||||
secret_store.SecretType.CERTIFICATE,
|
||||
enums.ObjectType.CERTIFICATE,
|
||||
None,
|
||||
base64.b64encode(keys.get_certificate_pem()),
|
||||
False]
|
||||
})
|
||||
def test_get_secret(self, kmip_secret, secret_type, kmip_type,
|
||||
key_format_type, expected_secret, pkcs1_only):
|
||||
def test_get_secret(self, kmip_secret, secret_type, expected_secret,
|
||||
pkcs1_only):
|
||||
self.secret_store.pkcs1_only = pkcs1_only
|
||||
self.secret_store.client.proxy.get = mock.MagicMock(
|
||||
proxy.KMIPProxy().get, return_value=results.GetResult(
|
||||
contents.ResultStatus(enums.ResultStatus.SUCCESS),
|
||||
object_type=attr.ObjectType(kmip_type),
|
||||
secret=kmip_secret))
|
||||
self.secret_store.client.get.return_value = kmip_secret
|
||||
uuid = utils.generate_test_uuid(0)
|
||||
metadata = {kss.KMIPSecretStore.KEY_UUID: uuid}
|
||||
secret_dto = self.secret_store.get_secret(secret_type, metadata)
|
||||
|
||||
self.secret_store.client.proxy.get.assert_called_once_with(uuid)
|
||||
self.secret_store.client.get.assert_called_once_with(uuid)
|
||||
self.assertEqual(secret_store.SecretDTO, type(secret_dto))
|
||||
self.assertEqual(secret_type, secret_dto.type)
|
||||
self.assertEqual(expected_secret, secret_dto.secret)
|
||||
|
||||
def test_get_secret_symmetric_return_value_invalid_key_material_type(self):
|
||||
sample_secret = self.sample_secret
|
||||
sample_secret.key_block.key_value.key_material = 'invalid_type'
|
||||
self.secret_store.client.proxy.get = mock.MagicMock(
|
||||
proxy.KMIPProxy().get, return_value=results.GetResult(
|
||||
contents.ResultStatus(enums.ResultStatus.SUCCESS),
|
||||
object_type=attr.ObjectType(enums.ObjectType.SYMMETRIC_KEY),
|
||||
secret=sample_secret))
|
||||
invalid_secret = self.sample_secret
|
||||
invalid_secret.value = list('invalid')
|
||||
self.secret_store.client.get.return_value = invalid_secret
|
||||
|
||||
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
|
||||
self.assertRaises(
|
||||
@ -798,9 +693,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
self.symmetric_type, metadata)
|
||||
|
||||
def test_get_secret_symmetric_server_error_occurs(self):
|
||||
self.secret_store.client.proxy.get = mock.MagicMock(
|
||||
proxy.KMIPProxy().get, return_value=results.GetResult(
|
||||
contents.ResultStatus(enums.ResultStatus.OPERATION_FAILED)))
|
||||
self.secret_store.client.get.side_effect = Exception
|
||||
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
|
||||
self.assertRaises(
|
||||
secret_store.SecretGeneralException,
|
||||
@ -808,8 +701,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
self.symmetric_type, metadata)
|
||||
|
||||
def test_get_secret_symmetric_error_opening_connection(self):
|
||||
self.secret_store.client.proxy.open = mock.Mock(
|
||||
side_effect=socket.error)
|
||||
self.secret_store.client.open.side_effect = Exception
|
||||
|
||||
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
|
||||
self.assertRaises(
|
||||
@ -826,7 +718,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
def test_delete_secret_assert_called(self):
|
||||
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
|
||||
self.secret_store.delete_secret(metadata)
|
||||
self.secret_store.client.proxy.destroy.assert_called_once_with(
|
||||
self.secret_store.client.destroy.assert_called_once_with(
|
||||
self.symmetric_key_uuid)
|
||||
|
||||
def test_delete_secret_return_value(self):
|
||||
@ -835,9 +727,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
self.assertIsNone(return_value)
|
||||
|
||||
def test_delete_secret_server_error_occurs(self):
|
||||
self.secret_store.client.proxy.destroy = mock.MagicMock(
|
||||
proxy.KMIPProxy().destroy, return_value=results.DestroyResult(
|
||||
contents.ResultStatus(enums.ResultStatus.OPERATION_FAILED)))
|
||||
self.secret_store.client.destroy.side_effect = Exception
|
||||
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
|
||||
self.assertRaises(
|
||||
secret_store.SecretGeneralException,
|
||||
@ -845,8 +735,7 @@ class WhenTestingKMIPSecretStore(utils.BaseTestCase):
|
||||
metadata)
|
||||
|
||||
def test_delete_secret_error_opening_connection(self):
|
||||
self.secret_store.client.proxy.open = mock.Mock(
|
||||
side_effect=socket.error)
|
||||
self.secret_store.client.open.side_effect = Exception
|
||||
metadata = {kss.KMIPSecretStore.KEY_UUID: self.symmetric_key_uuid}
|
||||
self.assertRaises(
|
||||
secret_store.SecretGeneralException,
|
||||
|
Loading…
x
Reference in New Issue
Block a user