diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index a3b539f..eee000b 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -42,6 +42,7 @@ from kmip.core.messages.payloads import activate from kmip.core.messages.payloads import revoke from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create_key_pair +from kmip.core.messages.payloads import derive_key from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import encrypt @@ -963,6 +964,8 @@ class KmipEngine(object): return self._process_create_key_pair(payload) elif operation == enums.Operation.REGISTER: return self._process_register(payload) + elif operation == enums.Operation.DERIVE_KEY: + return self._process_derive_key(payload) elif operation == enums.Operation.LOCATE: return self._process_locate(payload) elif operation == enums.Operation.GET: @@ -1330,6 +1333,188 @@ class KmipEngine(object): return response_payload + @_kmip_version_supported('1.0') + def _process_derive_key(self, payload): + self._logger.info("Processing operation: DeriveKey") + + object_attributes = {} + if payload.template_attribute: + object_attributes = self._process_template_attribute( + payload.template_attribute + ) + + if payload.object_type not in [ + enums.ObjectType.SYMMETRIC_KEY, + enums.ObjectType.SECRET_DATA + ]: + raise exceptions.InvalidField( + "Key derivation can only generate a SymmetricKey or " + "SecretData object." + ) + + # Retrieve existing managed objects to be used in the key derivation + # process. If any are unaccessible or not suitable for key derivation, + # raise an error. + existing_objects = [] + for unique_identifier in payload.unique_identifiers: + managed_object = self._get_object_with_access_controls( + unique_identifier, + enums.Operation.GET + ) + if managed_object._object_type not in [ + enums.ObjectType.SECRET_DATA, + enums.ObjectType.SYMMETRIC_KEY, + enums.ObjectType.PUBLIC_KEY, + enums.ObjectType.PRIVATE_KEY + ]: + raise exceptions.InvalidField( + "Object {0} is not a suitable type for key " + "derivation. Please specify a key or secret data.".format( + unique_identifier + ) + ) + elif enums.CryptographicUsageMask.DERIVE_KEY not in \ + managed_object.cryptographic_usage_masks: + raise exceptions.InvalidField( + "The DeriveKey bit must be set in the cryptographic usage " + "mask for object {0} for it to be used in key " + "derivation.".format(unique_identifier) + ) + else: + existing_objects.append(managed_object) + + if len(existing_objects) > 1: + self._logger.info( + "{0} derivation objects specified with the DeriveKey " + "request.".format(len(existing_objects)) + ) + + # Select the derivation object to use as the keying material + keying_object = existing_objects[0] + self._logger.info( + "Object {0} will be used as the keying material for the " + "derivation process.".format(keying_object.unique_identifier) + ) + + derivation_parameters = payload.derivation_parameters + + derivation_data = None + if derivation_parameters.derivation_data is None: + if len(existing_objects) > 1: + for alternate in existing_objects[1:]: + if alternate._object_type == enums.ObjectType.SECRET_DATA: + self._logger.info( + "Object {0} will be used as the derivation data " + "for the derivation process.".format( + alternate.unique_identifier + ) + ) + derivation_data = alternate.value + break + else: + derivation_data = derivation_parameters.derivation_data + + iv = b'' + if derivation_parameters.initialization_vector is not None: + iv = derivation_parameters.initialization_vector + + # Get the derivation length from the template attribute. It is + # required so if it cannot be found, raise an error. + derivation_length = None + attribute = object_attributes.get('Cryptographic Length') + if attribute: + derivation_length = attribute.value + if (derivation_length % 8) == 0: + derivation_length //= 8 + else: + raise exceptions.InvalidField( + "The cryptographic length must correspond to a valid " + "number of bytes (i.e., it must be a multiple of 8)." + ) + else: + raise exceptions.InvalidField( + "The cryptographic length must be provided in the template " + "attribute." + ) + + cryptographic_algorithm = None + if payload.object_type == enums.ObjectType.SYMMETRIC_KEY: + attribute = object_attributes.get('Cryptographic Algorithm') + if attribute: + cryptographic_algorithm = attribute.value + else: + raise exceptions.InvalidField( + "The cryptographic algorithm must be provided in the " + "template attribute when deriving a symmetric key." + ) + + # TODO (peterhamilton): Pull cryptographic parameters from the keying + # object if none are provided with the payload + crypto_parameters = derivation_parameters.cryptographic_parameters + derived_data = self._cryptography_engine.derive_key( + derivation_method=payload.derivation_method, + derivation_length=derivation_length, + derivation_data=derivation_data, + key_material=keying_object.value, + hash_algorithm=crypto_parameters.hashing_algorithm, + salt=derivation_parameters.salt, + iteration_count=derivation_parameters.iteration_count, + encryption_algorithm=crypto_parameters.cryptographic_algorithm, + cipher_mode=crypto_parameters.block_cipher_mode, + padding_method=crypto_parameters.padding_method, + iv_nonce=iv + ) + + if derivation_length > len(derived_data): + raise exceptions.CryptographicFailure( + "The specified length exceeds the output of the derivation " + "method." + ) + + if payload.object_type == enums.ObjectType.SYMMETRIC_KEY: + managed_object = objects.SymmetricKey( + algorithm=cryptographic_algorithm, + length=(derivation_length * 8), + value=derived_data, + ) + else: + managed_object = objects.SecretData( + value=derived_data, + data_type=enums.SecretDataType.SEED, + ) + + managed_object.names = [] + + if payload.object_type == enums.ObjectType.SECRET_DATA: + del object_attributes['Cryptographic Length'] + self._set_attributes_on_managed_object( + managed_object, + object_attributes + ) + + # TODO (peterhamilton) Set additional server-only attributes. + managed_object._owner = self._client_identity + managed_object.initial_date = int(time.time()) + + self._data_session.add(managed_object) + self._data_session.commit() + + self._logger.info( + "Created a {0} with ID: {1}".format( + ''.join( + [x.capitalize() for x in + payload.object_type.name.split('_')] + ), + managed_object.unique_identifier + ) + ) + self._id_placeholder = str(managed_object.unique_identifier) + + response_payload = derive_key.DeriveKeyResponsePayload( + unique_identifier=str(managed_object.unique_identifier) + ) + return response_payload + @_kmip_version_supported('1.0') def _process_locate(self, payload): # TODO: Need to complete the filtering logic based on all given @@ -1734,6 +1919,7 @@ class KmipEngine(object): contents.Operation(enums.Operation.CREATE), contents.Operation(enums.Operation.CREATE_KEY_PAIR), contents.Operation(enums.Operation.REGISTER), + contents.Operation(enums.Operation.DERIVE_KEY), contents.Operation(enums.Operation.LOCATE), contents.Operation(enums.Operation.GET), contents.Operation(enums.Operation.GET_ATTRIBUTES), diff --git a/kmip/tests/unit/services/server/crypto/test_engine.py b/kmip/tests/unit/services/server/crypto/test_engine.py index 225478f..c175d25 100644 --- a/kmip/tests/unit/services/server/crypto/test_engine.py +++ b/kmip/tests/unit/services/server/crypto/test_engine.py @@ -1008,10 +1008,9 @@ def test_handle_symmetric_padding_undo(symmetric_padding_parameters): # # https://www.ietf.org/rfc/rfc6070.txt # -# HMAC test vectors were obtained from IETF RFC 2202 and RFC 4231: +# HMAC test vectors were obtained from IETF RFC 5869: # -# https://tools.ietf.org/html/rfc2202 -# https://tools.ietf.org/html/rfc4231 +# https://tools.ietf.org/html/rfc5869 # # HASH test vectors for SHA1/SHA224/SHA256/SHA384/SHA512 # were obtained from the NIST CAVP test suite. Test vectors for MD5 were diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 240b359..d2aea30 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -43,6 +43,7 @@ from kmip.core.messages.payloads import activate from kmip.core.messages.payloads import revoke from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create_key_pair +from kmip.core.messages.payloads import derive_key from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import encrypt @@ -3477,6 +3478,637 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_derive_key(self): + """ + Test that a DeriveKey request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + base_key = pie_objects.SymmetricKey( + algorithm=enums.CryptographicAlgorithm.HMAC_SHA256, + length=176, + value=( + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b\x0b\x0b' + ), + masks=[enums.CryptographicUsageMask.DERIVE_KEY] + ) + e._data_session.add(base_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + # Derive a SymmetricKey object. + payload = derive_key.DeriveKeyRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifiers=[str(base_key.unique_identifier)], + derivation_method=enums.DerivationMethod.HMAC, + derivation_parameters=attributes.DerivationParameters( + cryptographic_parameters=attributes.CryptographicParameters( + hashing_algorithm=enums.HashingAlgorithm.SHA_256 + ), + derivation_data=( + b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7' + b'\xf8\xf9' + ), + salt=( + b'\x00\x01\x02\x03\x04\x05\x06\x07' + b'\x08\x09\x0a\x0b\x0c' + ) + ), + template_attribute=objects.TemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 336 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + ] + ) + ) + + response_payload = e._process_derive_key(payload) + + e._logger.info.assert_any_call("Processing operation: DeriveKey") + e._logger.info.assert_any_call( + "Object 1 will be used as the keying material for the derivation " + "process." + ) + e._logger.info.assert_any_call("Created a SymmetricKey with ID: 2") + + self.assertEqual("2", response_payload.unique_identifier) + + managed_object = e._data_session.query( + pie_objects.SymmetricKey + ).filter( + pie_objects.SymmetricKey.unique_identifier == 2 + ).one() + + self.assertEqual( + ( + b'\x3c\xb2\x5f\x25\xfa\xac\xd5\x7a' + b'\x90\x43\x4f\x64\xd0\x36\x2f\x2a' + b'\x2d\x2d\x0a\x90\xcf\x1a\x5a\x4c' + b'\x5d\xb0\x2d\x56\xec\xc4\xc5\xbf' + b'\x34\x00\x72\x08\xd5\xb8\x87\x18' + b'\x58\x65' + ), + managed_object.value + ) + self.assertEqual( + enums.CryptographicAlgorithm.AES, + managed_object.cryptographic_algorithm + ) + self.assertEqual( + 336, + managed_object.cryptographic_length + ) + self.assertIsNotNone(managed_object.initial_date) + + e._logger.reset_mock() + + base_key = pie_objects.SymmetricKey( + algorithm=enums.CryptographicAlgorithm.BLOWFISH, + length=128, + value=( + b'\x01\x23\x45\x67\x89\xAB\xCD\xEF' + b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87' + ), + masks=[enums.CryptographicUsageMask.DERIVE_KEY] + ) + e._data_session.add(base_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + # Derive a SecretData object. + payload = derive_key.DeriveKeyRequestPayload( + object_type=enums.ObjectType.SECRET_DATA, + unique_identifiers=[str(base_key.unique_identifier)], + derivation_method=enums.DerivationMethod.ENCRYPT, + derivation_parameters=attributes.DerivationParameters( + cryptographic_parameters=attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + hashing_algorithm=enums.HashingAlgorithm.SHA_256, + cryptographic_algorithm=( + enums.CryptographicAlgorithm.BLOWFISH + ) + ), + initialization_vector=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10', + derivation_data=( + b'\x37\x36\x35\x34\x33\x32\x31\x20' + b'\x4E\x6F\x77\x20\x69\x73\x20\x74' + b'\x68\x65\x20\x74\x69\x6D\x65\x20' + b'\x66\x6F\x72\x20\x00' + ), + ), + template_attribute=objects.TemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 256 + ) + ] + ) + ) + + response_payload = e._process_derive_key(payload) + + e._logger.info.assert_any_call("Processing operation: DeriveKey") + e._logger.info.assert_any_call( + "Object 3 will be used as the keying material for the derivation " + "process." + ) + e._logger.info.assert_any_call("Created a SecretData with ID: 4") + + self.assertEqual("4", response_payload.unique_identifier) + + managed_object = e._data_session.query( + pie_objects.SecretData + ).filter( + pie_objects.SecretData.unique_identifier == 4 + ).one() + + self.assertEqual( + ( + b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6' + b'\x05\xB1\x56\xE2\x74\x03\x97\x93' + b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9' + b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B' + ), + managed_object.value + ) + self.assertEqual(enums.SecretDataType.SEED, managed_object.data_type) + self.assertIsNotNone(managed_object.initial_date) + + def test_derive_key_invalid_derivation_type(self): + """ + Test that the right error is thrown when an invalid derivation type + is provided with a DeriveKey request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + payload = derive_key.DeriveKeyRequestPayload( + object_type=enums.ObjectType.CERTIFICATE + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.InvalidField, + "Key derivation can only generate a SymmetricKey or SecretData " + "object.", + e._process_derive_key, + *args + ) + + def test_derive_key_invalid_base_key(self): + """ + Test that the right error is thrown when an object not suitable for + key derivation is provided as the base key with a DeriveKey request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + invalid_key = pie_objects.OpaqueObject( + b'\x01\x02\x04\x08\x10\x20\x40\x80', + enums.OpaqueDataType.NONE + ) + e._data_session.add(invalid_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + payload = derive_key.DeriveKeyRequestPayload( + object_type=enums.ObjectType.SECRET_DATA, + unique_identifiers=[str(invalid_key.unique_identifier)] + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.InvalidField, + "Object 1 is not a suitable type for key derivation. Please " + "specify a key or secret data.", + e._process_derive_key, + *args + ) + + def test_derive_key_non_derivable_base_key(self): + """ + Test that the right error is thrown when an object suitable for + key derivation but not marked as such is provided as the base key + with a DeriveKey request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + base_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + ( + b'\x00\x01\x02\x03\x04\x05\x06\x07' + b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F' + ), + [enums.CryptographicUsageMask.ENCRYPT] + ) + e._data_session.add(base_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + payload = derive_key.DeriveKeyRequestPayload( + object_type=enums.ObjectType.SECRET_DATA, + unique_identifiers=[str(base_key.unique_identifier)] + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The DeriveKey bit must be set in the cryptographic usage mask " + "for object 1 for it to be used in key derivation.", + e._process_derive_key, + *args + ) + + def test_derive_key_alternate_derivation_data(self): + """ + Test that a DeriveKey request can be processed correctly by + specifying multiple base objects and no derivation data. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + base_key = pie_objects.SymmetricKey( + algorithm=enums.CryptographicAlgorithm.HMAC_SHA256, + length=176, + value=( + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b\x0b\x0b' + ), + masks=[enums.CryptographicUsageMask.DERIVE_KEY] + ) + e._data_session.add(base_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + base_data = pie_objects.SecretData( + value=( + b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7' + b'\xf8\xf9' + ), + data_type=enums.SecretDataType.SEED, + masks=[enums.CryptographicUsageMask.DERIVE_KEY] + ) + e._data_session.add(base_data) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + payload = derive_key.DeriveKeyRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifiers=[ + str(base_key.unique_identifier), + str(base_data.unique_identifier) + ], + derivation_method=enums.DerivationMethod.HMAC, + derivation_parameters=attributes.DerivationParameters( + cryptographic_parameters=attributes.CryptographicParameters( + hashing_algorithm=enums.HashingAlgorithm.SHA_256 + ), + salt=( + b'\x00\x01\x02\x03\x04\x05\x06\x07' + b'\x08\x09\x0a\x0b\x0c' + ) + ), + template_attribute=objects.TemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 336 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + ] + ) + ) + + response_payload = e._process_derive_key(payload) + + e._logger.info.assert_any_call("Processing operation: DeriveKey") + e._logger.info.assert_any_call( + "2 derivation objects specified with the DeriveKey request." + ) + e._logger.info.assert_any_call( + "Object 1 will be used as the keying material for the derivation " + "process." + ) + e._logger.info.assert_any_call( + "Object 2 will be used as the derivation data for the derivation " + "process." + ) + e._logger.info.assert_any_call("Created a SymmetricKey with ID: 3") + + self.assertEqual("3", response_payload.unique_identifier) + + managed_object = e._data_session.query( + pie_objects.SymmetricKey + ).filter( + pie_objects.SymmetricKey.unique_identifier == 3 + ).one() + + self.assertEqual( + ( + b'\x3c\xb2\x5f\x25\xfa\xac\xd5\x7a' + b'\x90\x43\x4f\x64\xd0\x36\x2f\x2a' + b'\x2d\x2d\x0a\x90\xcf\x1a\x5a\x4c' + b'\x5d\xb0\x2d\x56\xec\xc4\xc5\xbf' + b'\x34\x00\x72\x08\xd5\xb8\x87\x18' + b'\x58\x65' + ), + managed_object.value + ) + self.assertEqual( + enums.CryptographicAlgorithm.AES, + managed_object.cryptographic_algorithm + ) + self.assertEqual( + 336, + managed_object.cryptographic_length + ) + self.assertIsNotNone(managed_object.initial_date) + + def test_derive_key_unspecified_iv(self): + """ + """ + self.skip('') + + def test_derive_key_missing_cryptographic_length(self): + """ + Test that the right error is thrown when the cryptographic length is + missing from a DeriveKey request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + base_key = pie_objects.SymmetricKey( + algorithm=enums.CryptographicAlgorithm.HMAC_SHA256, + length=160, + value=( + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b' + ), + masks=[enums.CryptographicUsageMask.DERIVE_KEY] + ) + e._data_session.add(base_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + payload = derive_key.DeriveKeyRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifiers=[str(base_key.unique_identifier)], + derivation_method=enums.DerivationMethod.HMAC, + derivation_parameters=attributes.DerivationParameters( + cryptographic_parameters=attributes.CryptographicParameters( + hashing_algorithm=enums.HashingAlgorithm.SHA_256 + ), + derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65', + ), + template_attribute=objects.TemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + ] + ) + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The cryptographic length must be provided in the template " + "attribute.", + e._process_derive_key, + *args + ) + + def test_derive_key_invalid_cryptographic_length(self): + """ + Test that the right error is thrown when an invalid cryptographic + length is provided with a DeriveKey request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + base_key = pie_objects.SymmetricKey( + algorithm=enums.CryptographicAlgorithm.HMAC_SHA256, + length=160, + value=( + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b' + ), + masks=[enums.CryptographicUsageMask.DERIVE_KEY] + ) + e._data_session.add(base_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + payload = derive_key.DeriveKeyRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifiers=[str(base_key.unique_identifier)], + derivation_method=enums.DerivationMethod.HMAC, + derivation_parameters=attributes.DerivationParameters( + cryptographic_parameters=attributes.CryptographicParameters( + hashing_algorithm=enums.HashingAlgorithm.SHA_256 + ), + derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65', + ), + template_attribute=objects.TemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 123 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + ] + ) + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The cryptographic length must correspond to a valid number of " + "bytes \(i.e., it must be a multiple of 8\).", + e._process_derive_key, + *args + ) + + def test_derive_key_missing_cryptographic_algorithm(self): + """ + Test that the right error is thrown when the cryptographic algorithm + is missing from a DeriveKey request when deriving a symmetric key. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + base_key = pie_objects.SymmetricKey( + algorithm=enums.CryptographicAlgorithm.HMAC_SHA256, + length=160, + value=( + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b' + ), + masks=[enums.CryptographicUsageMask.DERIVE_KEY] + ) + e._data_session.add(base_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + attribute_factory = factory.AttributeFactory() + + payload = derive_key.DeriveKeyRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifiers=[str(base_key.unique_identifier)], + derivation_method=enums.DerivationMethod.HMAC, + derivation_parameters=attributes.DerivationParameters( + cryptographic_parameters=attributes.CryptographicParameters( + hashing_algorithm=enums.HashingAlgorithm.SHA_256 + ), + derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65', + ), + template_attribute=objects.TemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 256 + ) + ] + ) + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.InvalidField, + "The cryptographic algorithm must be provided in the template " + "attribute when deriving a symmetric key.", + e._process_derive_key, + *args + ) + + def test_derive_key_oversized_cryptographic_length(self): + """ + Test that the right error is thrown when an invalid cryptographic + length is provided with a DeriveKey request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._cryptography_engine.logger = mock.MagicMock() + + base_key = pie_objects.SymmetricKey( + algorithm=enums.CryptographicAlgorithm.HMAC_SHA256, + length=160, + value=( + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' + b'\x0b\x0b\x0b\x0b' + ), + masks=[enums.CryptographicUsageMask.DERIVE_KEY] + ) + e._data_session.add(base_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._cryptography_engine = mock.MagicMock() + e._cryptography_engine.derive_key.return_value = b'' + + attribute_factory = factory.AttributeFactory() + + payload = derive_key.DeriveKeyRequestPayload( + object_type=enums.ObjectType.SYMMETRIC_KEY, + unique_identifiers=[str(base_key.unique_identifier)], + derivation_method=enums.DerivationMethod.HMAC, + derivation_parameters=attributes.DerivationParameters( + cryptographic_parameters=attributes.CryptographicParameters( + hashing_algorithm=enums.HashingAlgorithm.SHA_256 + ), + derivation_data=b'\x48\x69\x20\x54\x68\x65\x72\x65', + ), + template_attribute=objects.TemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 256 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + ] + ) + ) + + args = (payload, ) + self.assertRaisesRegexp( + exceptions.CryptographicFailure, + "The specified length exceeds the output of the derivation " + "method.", + e._process_derive_key, + *args + ) + def test_locate(self): """ Test that a Locate request can be processed correctly. @@ -5620,7 +6252,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(10, len(result.operations)) + self.assertEqual(11, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -5634,33 +6266,37 @@ class TestKmipEngine(testtools.TestCase): result.operations[2].value ) self.assertEqual( - enums.Operation.LOCATE, + enums.Operation.DERIVE_KEY, result.operations[3].value ) self.assertEqual( - enums.Operation.GET, + enums.Operation.LOCATE, result.operations[4].value ) self.assertEqual( - enums.Operation.GET_ATTRIBUTES, + enums.Operation.GET, result.operations[5].value ) self.assertEqual( - enums.Operation.GET_ATTRIBUTE_LIST, + enums.Operation.GET_ATTRIBUTES, result.operations[6].value ) self.assertEqual( - enums.Operation.ACTIVATE, + enums.Operation.GET_ATTRIBUTE_LIST, result.operations[7].value ) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.ACTIVATE, result.operations[8].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[9].value ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[10].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -5698,7 +6334,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(11, len(result.operations)) + self.assertEqual(12, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -5712,37 +6348,41 @@ class TestKmipEngine(testtools.TestCase): result.operations[2].value ) self.assertEqual( - enums.Operation.LOCATE, + enums.Operation.DERIVE_KEY, result.operations[3].value ) self.assertEqual( - enums.Operation.GET, + enums.Operation.LOCATE, result.operations[4].value ) self.assertEqual( - enums.Operation.GET_ATTRIBUTES, + enums.Operation.GET, result.operations[5].value ) self.assertEqual( - enums.Operation.GET_ATTRIBUTE_LIST, + enums.Operation.GET_ATTRIBUTES, result.operations[6].value ) self.assertEqual( - enums.Operation.ACTIVATE, + enums.Operation.GET_ATTRIBUTE_LIST, result.operations[7].value ) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.ACTIVATE, result.operations[8].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[9].value ) self.assertEqual( - enums.Operation.DISCOVER_VERSIONS, + enums.Operation.QUERY, result.operations[10].value ) + self.assertEqual( + enums.Operation.DISCOVER_VERSIONS, + result.operations[11].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -5780,7 +6420,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(13, len(result.operations)) + self.assertEqual(14, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -5794,45 +6434,49 @@ class TestKmipEngine(testtools.TestCase): result.operations[2].value ) self.assertEqual( - enums.Operation.LOCATE, + enums.Operation.DERIVE_KEY, result.operations[3].value ) self.assertEqual( - enums.Operation.GET, + enums.Operation.LOCATE, result.operations[4].value ) self.assertEqual( - enums.Operation.GET_ATTRIBUTES, + enums.Operation.GET, result.operations[5].value ) self.assertEqual( - enums.Operation.GET_ATTRIBUTE_LIST, + enums.Operation.GET_ATTRIBUTES, result.operations[6].value ) self.assertEqual( - enums.Operation.ACTIVATE, + enums.Operation.GET_ATTRIBUTE_LIST, result.operations[7].value ) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.ACTIVATE, result.operations[8].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[9].value ) self.assertEqual( - enums.Operation.DISCOVER_VERSIONS, + enums.Operation.QUERY, result.operations[10].value ) self.assertEqual( - enums.Operation.ENCRYPT, + enums.Operation.DISCOVER_VERSIONS, result.operations[11].value ) self.assertEqual( - enums.Operation.MAC, + enums.Operation.ENCRYPT, result.operations[12].value ) + self.assertEqual( + enums.Operation.MAC, + result.operations[13].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual(