Merge pull request #303 from OpenKMIP/feat/add-derive-key-to-server

Add DeriveKey support to the server
This commit is contained in:
Peter Hamilton 2017-07-25 13:42:30 -04:00 committed by GitHub
commit f86134878b
3 changed files with 860 additions and 31 deletions

View File

@ -42,6 +42,7 @@ from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import revoke from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import derive_key
from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import encrypt from kmip.core.messages.payloads import encrypt
@ -963,6 +964,8 @@ class KmipEngine(object):
return self._process_create_key_pair(payload) return self._process_create_key_pair(payload)
elif operation == enums.Operation.REGISTER: elif operation == enums.Operation.REGISTER:
return self._process_register(payload) return self._process_register(payload)
elif operation == enums.Operation.DERIVE_KEY:
return self._process_derive_key(payload)
elif operation == enums.Operation.LOCATE: elif operation == enums.Operation.LOCATE:
return self._process_locate(payload) return self._process_locate(payload)
elif operation == enums.Operation.GET: elif operation == enums.Operation.GET:
@ -1330,6 +1333,188 @@ class KmipEngine(object):
return response_payload return response_payload
@_kmip_version_supported('1.0')
def _process_derive_key(self, payload):
self._logger.info("Processing operation: DeriveKey")
object_attributes = {}
if payload.template_attribute:
object_attributes = self._process_template_attribute(
payload.template_attribute
)
if payload.object_type not in [
enums.ObjectType.SYMMETRIC_KEY,
enums.ObjectType.SECRET_DATA
]:
raise exceptions.InvalidField(
"Key derivation can only generate a SymmetricKey or "
"SecretData object."
)
# Retrieve existing managed objects to be used in the key derivation
# process. If any are unaccessible or not suitable for key derivation,
# raise an error.
existing_objects = []
for unique_identifier in payload.unique_identifiers:
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.GET
)
if managed_object._object_type not in [
enums.ObjectType.SECRET_DATA,
enums.ObjectType.SYMMETRIC_KEY,
enums.ObjectType.PUBLIC_KEY,
enums.ObjectType.PRIVATE_KEY
]:
raise exceptions.InvalidField(
"Object {0} is not a suitable type for key "
"derivation. Please specify a key or secret data.".format(
unique_identifier
)
)
elif enums.CryptographicUsageMask.DERIVE_KEY not in \
managed_object.cryptographic_usage_masks:
raise exceptions.InvalidField(
"The DeriveKey bit must be set in the cryptographic usage "
"mask for object {0} for it to be used in key "
"derivation.".format(unique_identifier)
)
else:
existing_objects.append(managed_object)
if len(existing_objects) > 1:
self._logger.info(
"{0} derivation objects specified with the DeriveKey "
"request.".format(len(existing_objects))
)
# Select the derivation object to use as the keying material
keying_object = existing_objects[0]
self._logger.info(
"Object {0} will be used as the keying material for the "
"derivation process.".format(keying_object.unique_identifier)
)
derivation_parameters = payload.derivation_parameters
derivation_data = None
if derivation_parameters.derivation_data is None:
if len(existing_objects) > 1:
for alternate in existing_objects[1:]:
if alternate._object_type == enums.ObjectType.SECRET_DATA:
self._logger.info(
"Object {0} will be used as the derivation data "
"for the derivation process.".format(
alternate.unique_identifier
)
)
derivation_data = alternate.value
break
else:
derivation_data = derivation_parameters.derivation_data
iv = b''
if derivation_parameters.initialization_vector is not None:
iv = derivation_parameters.initialization_vector
# Get the derivation length from the template attribute. It is
# required so if it cannot be found, raise an error.
derivation_length = None
attribute = object_attributes.get('Cryptographic Length')
if attribute:
derivation_length = attribute.value
if (derivation_length % 8) == 0:
derivation_length //= 8
else:
raise exceptions.InvalidField(
"The cryptographic length must correspond to a valid "
"number of bytes (i.e., it must be a multiple of 8)."
)
else:
raise exceptions.InvalidField(
"The cryptographic length must be provided in the template "
"attribute."
)
cryptographic_algorithm = None
if payload.object_type == enums.ObjectType.SYMMETRIC_KEY:
attribute = object_attributes.get('Cryptographic Algorithm')
if attribute:
cryptographic_algorithm = attribute.value
else:
raise exceptions.InvalidField(
"The cryptographic algorithm must be provided in the "
"template attribute when deriving a symmetric key."
)
# TODO (peterhamilton): Pull cryptographic parameters from the keying
# object if none are provided with the payload
crypto_parameters = derivation_parameters.cryptographic_parameters
derived_data = self._cryptography_engine.derive_key(
derivation_method=payload.derivation_method,
derivation_length=derivation_length,
derivation_data=derivation_data,
key_material=keying_object.value,
hash_algorithm=crypto_parameters.hashing_algorithm,
salt=derivation_parameters.salt,
iteration_count=derivation_parameters.iteration_count,
encryption_algorithm=crypto_parameters.cryptographic_algorithm,
cipher_mode=crypto_parameters.block_cipher_mode,
padding_method=crypto_parameters.padding_method,
iv_nonce=iv
)
if derivation_length > len(derived_data):
raise exceptions.CryptographicFailure(
"The specified length exceeds the output of the derivation "
"method."
)
if payload.object_type == enums.ObjectType.SYMMETRIC_KEY:
managed_object = objects.SymmetricKey(
algorithm=cryptographic_algorithm,
length=(derivation_length * 8),
value=derived_data,
)
else:
managed_object = objects.SecretData(
value=derived_data,
data_type=enums.SecretDataType.SEED,
)
managed_object.names = []
if payload.object_type == enums.ObjectType.SECRET_DATA:
del object_attributes['Cryptographic Length']
self._set_attributes_on_managed_object(
managed_object,
object_attributes
)
# TODO (peterhamilton) Set additional server-only attributes.
managed_object._owner = self._client_identity
managed_object.initial_date = int(time.time())
self._data_session.add(managed_object)
self._data_session.commit()
self._logger.info(
"Created a {0} with ID: {1}".format(
''.join(
[x.capitalize() for x in
payload.object_type.name.split('_')]
),
managed_object.unique_identifier
)
)
self._id_placeholder = str(managed_object.unique_identifier)
response_payload = derive_key.DeriveKeyResponsePayload(
unique_identifier=str(managed_object.unique_identifier)
)
return response_payload
@_kmip_version_supported('1.0') @_kmip_version_supported('1.0')
def _process_locate(self, payload): def _process_locate(self, payload):
# TODO: Need to complete the filtering logic based on all given # TODO: Need to complete the filtering logic based on all given
@ -1734,6 +1919,7 @@ class KmipEngine(object):
contents.Operation(enums.Operation.CREATE), contents.Operation(enums.Operation.CREATE),
contents.Operation(enums.Operation.CREATE_KEY_PAIR), contents.Operation(enums.Operation.CREATE_KEY_PAIR),
contents.Operation(enums.Operation.REGISTER), contents.Operation(enums.Operation.REGISTER),
contents.Operation(enums.Operation.DERIVE_KEY),
contents.Operation(enums.Operation.LOCATE), contents.Operation(enums.Operation.LOCATE),
contents.Operation(enums.Operation.GET), contents.Operation(enums.Operation.GET),
contents.Operation(enums.Operation.GET_ATTRIBUTES), contents.Operation(enums.Operation.GET_ATTRIBUTES),

View File

@ -1008,10 +1008,9 @@ def test_handle_symmetric_padding_undo(symmetric_padding_parameters):
# #
# https://www.ietf.org/rfc/rfc6070.txt # https://www.ietf.org/rfc/rfc6070.txt
# #
# HMAC test vectors were obtained from IETF RFC 2202 and RFC 4231: # HMAC test vectors were obtained from IETF RFC 5869:
# #
# https://tools.ietf.org/html/rfc2202 # https://tools.ietf.org/html/rfc5869
# https://tools.ietf.org/html/rfc4231
# #
# HASH test vectors for SHA1/SHA224/SHA256/SHA384/SHA512 # HASH test vectors for SHA1/SHA224/SHA256/SHA384/SHA512
# were obtained from the NIST CAVP test suite. Test vectors for MD5 were # were obtained from the NIST CAVP test suite. Test vectors for MD5 were

View File

@ -43,6 +43,7 @@ from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import revoke from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import derive_key
from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import encrypt from kmip.core.messages.payloads import encrypt
@ -3477,6 +3478,637 @@ class TestKmipEngine(testtools.TestCase):
*args *args
) )
def test_derive_key(self):
"""
Test that a DeriveKey request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=176,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
attribute_factory = factory.AttributeFactory()
# Derive a SymmetricKey object.
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
derivation_data=(
b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7'
b'\xf8\xf9'
),
salt=(
b'\x00\x01\x02\x03\x04\x05\x06\x07'
b'\x08\x09\x0a\x0b\x0c'
)
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
336
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
]
)
)
response_payload = e._process_derive_key(payload)
e._logger.info.assert_any_call("Processing operation: DeriveKey")
e._logger.info.assert_any_call(
"Object 1 will be used as the keying material for the derivation "
"process."
)
e._logger.info.assert_any_call("Created a SymmetricKey with ID: 2")
self.assertEqual("2", response_payload.unique_identifier)
managed_object = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.SymmetricKey.unique_identifier == 2
).one()
self.assertEqual(
(
b'\x3c\xb2\x5f\x25\xfa\xac\xd5\x7a'
b'\x90\x43\x4f\x64\xd0\x36\x2f\x2a'
b'\x2d\x2d\x0a\x90\xcf\x1a\x5a\x4c'
b'\x5d\xb0\x2d\x56\xec\xc4\xc5\xbf'
b'\x34\x00\x72\x08\xd5\xb8\x87\x18'
b'\x58\x65'
),
managed_object.value
)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
managed_object.cryptographic_algorithm
)
self.assertEqual(
336,
managed_object.cryptographic_length
)
self.assertIsNotNone(managed_object.initial_date)
e._logger.reset_mock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.BLOWFISH,
length=128,
value=(
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
# Derive a SecretData object.
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SECRET_DATA,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.ENCRYPT,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
block_cipher_mode=enums.BlockCipherMode.CBC,
padding_method=enums.PaddingMethod.PKCS5,
hashing_algorithm=enums.HashingAlgorithm.SHA_256,
cryptographic_algorithm=(
enums.CryptographicAlgorithm.BLOWFISH
)
),
initialization_vector=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10',
derivation_data=(
b'\x37\x36\x35\x34\x33\x32\x31\x20'
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
b'\x66\x6F\x72\x20\x00'
),
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
256
)
]
)
)
response_payload = e._process_derive_key(payload)
e._logger.info.assert_any_call("Processing operation: DeriveKey")
e._logger.info.assert_any_call(
"Object 3 will be used as the keying material for the derivation "
"process."
)
e._logger.info.assert_any_call("Created a SecretData with ID: 4")
self.assertEqual("4", response_payload.unique_identifier)
managed_object = e._data_session.query(
pie_objects.SecretData
).filter(
pie_objects.SecretData.unique_identifier == 4
).one()
self.assertEqual(
(
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
),
managed_object.value
)
self.assertEqual(enums.SecretDataType.SEED, managed_object.data_type)
self.assertIsNotNone(managed_object.initial_date)
def test_derive_key_invalid_derivation_type(self):
"""
Test that the right error is thrown when an invalid derivation type
is provided with a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.CERTIFICATE
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"Key derivation can only generate a SymmetricKey or SecretData "
"object.",
e._process_derive_key,
*args
)
def test_derive_key_invalid_base_key(self):
"""
Test that the right error is thrown when an object not suitable for
key derivation is provided as the base key with a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
invalid_key = pie_objects.OpaqueObject(
b'\x01\x02\x04\x08\x10\x20\x40\x80',
enums.OpaqueDataType.NONE
)
e._data_session.add(invalid_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SECRET_DATA,
unique_identifiers=[str(invalid_key.unique_identifier)]
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"Object 1 is not a suitable type for key derivation. Please "
"specify a key or secret data.",
e._process_derive_key,
*args
)
def test_derive_key_non_derivable_base_key(self):
"""
Test that the right error is thrown when an object suitable for
key derivation but not marked as such is provided as the base key
with a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
128,
(
b'\x00\x01\x02\x03\x04\x05\x06\x07'
b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F'
),
[enums.CryptographicUsageMask.ENCRYPT]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SECRET_DATA,
unique_identifiers=[str(base_key.unique_identifier)]
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"The DeriveKey bit must be set in the cryptographic usage mask "
"for object 1 for it to be used in key derivation.",
e._process_derive_key,
*args
)
def test_derive_key_alternate_derivation_data(self):
"""
Test that a DeriveKey request can be processed correctly by
specifying multiple base objects and no derivation data.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=176,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
base_data = pie_objects.SecretData(
value=(
b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7'
b'\xf8\xf9'
),
data_type=enums.SecretDataType.SEED,
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_data)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
attribute_factory = factory.AttributeFactory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[
str(base_key.unique_identifier),
str(base_data.unique_identifier)
],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
salt=(
b'\x00\x01\x02\x03\x04\x05\x06\x07'
b'\x08\x09\x0a\x0b\x0c'
)
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
336
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
]
)
)
response_payload = e._process_derive_key(payload)
e._logger.info.assert_any_call("Processing operation: DeriveKey")
e._logger.info.assert_any_call(
"2 derivation objects specified with the DeriveKey request."
)
e._logger.info.assert_any_call(
"Object 1 will be used as the keying material for the derivation "
"process."
)
e._logger.info.assert_any_call(
"Object 2 will be used as the derivation data for the derivation "
"process."
)
e._logger.info.assert_any_call("Created a SymmetricKey with ID: 3")
self.assertEqual("3", response_payload.unique_identifier)
managed_object = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.SymmetricKey.unique_identifier == 3
).one()
self.assertEqual(
(
b'\x3c\xb2\x5f\x25\xfa\xac\xd5\x7a'
b'\x90\x43\x4f\x64\xd0\x36\x2f\x2a'
b'\x2d\x2d\x0a\x90\xcf\x1a\x5a\x4c'
b'\x5d\xb0\x2d\x56\xec\xc4\xc5\xbf'
b'\x34\x00\x72\x08\xd5\xb8\x87\x18'
b'\x58\x65'
),
managed_object.value
)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
managed_object.cryptographic_algorithm
)
self.assertEqual(
336,
managed_object.cryptographic_length
)
self.assertIsNotNone(managed_object.initial_date)
def test_derive_key_unspecified_iv(self):
"""
"""
self.skip('')
def test_derive_key_missing_cryptographic_length(self):
"""
Test that the right error is thrown when the cryptographic length is
missing from a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=160,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
attribute_factory = factory.AttributeFactory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65',
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
]
)
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"The cryptographic length must be provided in the template "
"attribute.",
e._process_derive_key,
*args
)
def test_derive_key_invalid_cryptographic_length(self):
"""
Test that the right error is thrown when an invalid cryptographic
length is provided with a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=160,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
attribute_factory = factory.AttributeFactory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65',
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
123
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
]
)
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"The cryptographic length must correspond to a valid number of "
"bytes \(i.e., it must be a multiple of 8\).",
e._process_derive_key,
*args
)
def test_derive_key_missing_cryptographic_algorithm(self):
"""
Test that the right error is thrown when the cryptographic algorithm
is missing from a DeriveKey request when deriving a symmetric key.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=160,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
attribute_factory = factory.AttributeFactory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65',
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
256
)
]
)
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"The cryptographic algorithm must be provided in the template "
"attribute when deriving a symmetric key.",
e._process_derive_key,
*args
)
def test_derive_key_oversized_cryptographic_length(self):
"""
Test that the right error is thrown when an invalid cryptographic
length is provided with a DeriveKey request.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._cryptography_engine.logger = mock.MagicMock()
base_key = pie_objects.SymmetricKey(
algorithm=enums.CryptographicAlgorithm.HMAC_SHA256,
length=160,
value=(
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
b'\x0b\x0b\x0b\x0b'
),
masks=[enums.CryptographicUsageMask.DERIVE_KEY]
)
e._data_session.add(base_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._cryptography_engine = mock.MagicMock()
e._cryptography_engine.derive_key.return_value = b''
attribute_factory = factory.AttributeFactory()
payload = derive_key.DeriveKeyRequestPayload(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=[str(base_key.unique_identifier)],
derivation_method=enums.DerivationMethod.HMAC,
derivation_parameters=attributes.DerivationParameters(
cryptographic_parameters=attributes.CryptographicParameters(
hashing_algorithm=enums.HashingAlgorithm.SHA_256
),
derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65',
),
template_attribute=objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
256
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
]
)
)
args = (payload, )
self.assertRaisesRegexp(
exceptions.CryptographicFailure,
"The specified length exceeds the output of the derivation "
"method.",
e._process_derive_key,
*args
)
def test_locate(self): def test_locate(self):
""" """
Test that a Locate request can be processed correctly. Test that a Locate request can be processed correctly.
@ -5620,7 +6252,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query") e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations) self.assertIsNotNone(result.operations)
self.assertEqual(10, len(result.operations)) self.assertEqual(11, len(result.operations))
self.assertEqual( self.assertEqual(
enums.Operation.CREATE, enums.Operation.CREATE,
result.operations[0].value result.operations[0].value
@ -5634,33 +6266,37 @@ class TestKmipEngine(testtools.TestCase):
result.operations[2].value result.operations[2].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.LOCATE, enums.Operation.DERIVE_KEY,
result.operations[3].value result.operations[3].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.GET, enums.Operation.LOCATE,
result.operations[4].value result.operations[4].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.GET_ATTRIBUTES, enums.Operation.GET,
result.operations[5].value result.operations[5].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST, enums.Operation.GET_ATTRIBUTES,
result.operations[6].value result.operations[6].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.ACTIVATE, enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[7].value result.operations[7].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.DESTROY, enums.Operation.ACTIVATE,
result.operations[8].value result.operations[8].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.QUERY, enums.Operation.DESTROY,
result.operations[9].value result.operations[9].value
) )
self.assertEqual(
enums.Operation.QUERY,
result.operations[10].value
)
self.assertEqual(list(), result.object_types) self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification) self.assertIsNotNone(result.vendor_identification)
self.assertEqual( self.assertEqual(
@ -5698,7 +6334,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query") e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations) self.assertIsNotNone(result.operations)
self.assertEqual(11, len(result.operations)) self.assertEqual(12, len(result.operations))
self.assertEqual( self.assertEqual(
enums.Operation.CREATE, enums.Operation.CREATE,
result.operations[0].value result.operations[0].value
@ -5712,37 +6348,41 @@ class TestKmipEngine(testtools.TestCase):
result.operations[2].value result.operations[2].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.LOCATE, enums.Operation.DERIVE_KEY,
result.operations[3].value result.operations[3].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.GET, enums.Operation.LOCATE,
result.operations[4].value result.operations[4].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.GET_ATTRIBUTES, enums.Operation.GET,
result.operations[5].value result.operations[5].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST, enums.Operation.GET_ATTRIBUTES,
result.operations[6].value result.operations[6].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.ACTIVATE, enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[7].value result.operations[7].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.DESTROY, enums.Operation.ACTIVATE,
result.operations[8].value result.operations[8].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.QUERY, enums.Operation.DESTROY,
result.operations[9].value result.operations[9].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.DISCOVER_VERSIONS, enums.Operation.QUERY,
result.operations[10].value result.operations[10].value
) )
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
result.operations[11].value
)
self.assertEqual(list(), result.object_types) self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification) self.assertIsNotNone(result.vendor_identification)
self.assertEqual( self.assertEqual(
@ -5780,7 +6420,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query") e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations) self.assertIsNotNone(result.operations)
self.assertEqual(13, len(result.operations)) self.assertEqual(14, len(result.operations))
self.assertEqual( self.assertEqual(
enums.Operation.CREATE, enums.Operation.CREATE,
result.operations[0].value result.operations[0].value
@ -5794,45 +6434,49 @@ class TestKmipEngine(testtools.TestCase):
result.operations[2].value result.operations[2].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.LOCATE, enums.Operation.DERIVE_KEY,
result.operations[3].value result.operations[3].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.GET, enums.Operation.LOCATE,
result.operations[4].value result.operations[4].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.GET_ATTRIBUTES, enums.Operation.GET,
result.operations[5].value result.operations[5].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST, enums.Operation.GET_ATTRIBUTES,
result.operations[6].value result.operations[6].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.ACTIVATE, enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[7].value result.operations[7].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.DESTROY, enums.Operation.ACTIVATE,
result.operations[8].value result.operations[8].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.QUERY, enums.Operation.DESTROY,
result.operations[9].value result.operations[9].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.DISCOVER_VERSIONS, enums.Operation.QUERY,
result.operations[10].value result.operations[10].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.ENCRYPT, enums.Operation.DISCOVER_VERSIONS,
result.operations[11].value result.operations[11].value
) )
self.assertEqual( self.assertEqual(
enums.Operation.MAC, enums.Operation.ENCRYPT,
result.operations[12].value result.operations[12].value
) )
self.assertEqual(
enums.Operation.MAC,
result.operations[13].value
)
self.assertEqual(list(), result.object_types) self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification) self.assertIsNotNone(result.vendor_identification)
self.assertEqual( self.assertEqual(