Add Encrypt support to the clients

This change adds Encrypt operation support to the KMIPProxy and
ProxyKmipClient clients, including unit tests to cover the new
functionality.

Extensive documentation has been added to the header comments for
the new client methods detailing the expected input parameters and
return values. This approach should be followed for all new client
additions going forward.
This commit is contained in:
Peter Hamilton 2017-06-20 10:47:53 -04:00
parent 7b8bd4769c
commit a5e960153c
6 changed files with 482 additions and 0 deletions

View File

@ -139,6 +139,25 @@ class KmipClient:
"""
pass
@abc.abstractmethod
def encrypt(self, data, uid=None, cryptographic_parameters=None,
iv_counter_nonce=None):
"""
Encrypt data using the specified encryption key and parameters.
Args:
data (bytes): The bytes to encrypt. Required.
uid (string): The unique ID of the encryption key to use.
Optional, defaults to None.
cryptographic_parameters (dict): A dictionary containing various
cryptographic settings to be used for the encryption.
Optional, defaults to None.
iv_counter_nonce (bytes): The bytes to use for the IV/counter/
nonce, if needed by the encryption algorithm and/or cipher
mode. Optional, defaults to None.
"""
pass
@abc.abstractmethod
def mac(self, data, uid, algorithm):
"""

View File

@ -649,6 +649,142 @@ class ProxyKmipClient(api.KmipClient):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def encrypt(self, data, uid=None, cryptographic_parameters=None,
iv_counter_nonce=None):
"""
Encrypt data using the specified encryption key and parameters.
Args:
data (bytes): The bytes to encrypt. Required.
uid (string): The unique ID of the encryption key to use.
Optional, defaults to None.
cryptographic_parameters (dict): A dictionary containing various
cryptographic settings to be used for the encryption.
Optional, defaults to None.
iv_counter_nonce (bytes): The bytes to use for the IV/counter/
nonce, if needed by the encryption algorithm and/or cipher
mode. Optional, defaults to None.
Returns:
bytes: The encrypted data.
bytes: The IV/counter/nonce used with the encryption algorithm,
only if it was autogenerated by the server.
Raises:
ClientConnectionNotOpen: if the client connection is unusable
KmipOperationFailure: if the operation result is a failure
TypeError: if the input arguments are invalid
Notes:
The cryptographic_parameters argument is a dictionary that can
contain the following key/value pairs:
Keys | Value
------------------------------|-----------------------------------
'block_cipher_mode' | A BlockCipherMode enumeration
| indicating the cipher mode to use
| with the encryption algorithm.
'padding_method' | A PaddingMethod enumeration
| indicating which padding method to
| use with the encryption algorithm.
'hashing_algorithm' | A HashingAlgorithm enumeration
| indicating which hashing algorithm
| to use.
'key_role_type' | A KeyRoleType enumeration
| indicating the intended use of the
| associated cryptographic key.
'digital_signature_algorithm' | A DigitalSignatureAlgorithm
| enumeration indicating which
| digital signature algorithm to
| use.
'cryptographic_algorithm' | A CryptographicAlgorithm
| enumeration indicating which
| encryption algorithm to use.
'random_iv' | A boolean indicating whether the
| server should autogenerate an IV.
'iv_length' | An integer representing the length
| of the initialization vector (IV)
| in bits.
'tag_length' | An integer representing the length
| of the authenticator tag in bytes.
'fixed_field_length' | An integer representing the length
| of the fixed field portion of the
| IV in bits.
'invocation_field_length' | An integer representing the length
| of the invocation field portion of
| the IV in bits.
'counter_length' | An integer representing the length
| of the coutner portion of the IV
| in bits.
'initial_counter_value' | An integer representing the
| starting counter value for CTR
| mode (typically 1).
"""
# Check input
if not isinstance(data, six.binary_type):
raise TypeError("data must be bytes")
if uid is not None:
if not isinstance(uid, six.string_types):
raise TypeError("uid must be a string")
if cryptographic_parameters is not None:
if not isinstance(cryptographic_parameters, dict):
raise TypeError("cryptographic_parameters must be a dict")
if iv_counter_nonce is not None:
if not isinstance(iv_counter_nonce, six.binary_type):
raise TypeError("iv_counter_nonce must be bytes")
# Verify that operations can be given at this time
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
cryptographic_parameters = CryptographicParameters(
block_cipher_mode=cryptographic_parameters.get(
'block_cipher_mode'
),
padding_method=cryptographic_parameters.get('padding_method'),
hashing_algorithm=cryptographic_parameters.get(
'hashing_algorithm'
),
key_role_type=cryptographic_parameters.get('key_role_type'),
digital_signature_algorithm=cryptographic_parameters.get(
'digital_signature_algorithm'
),
cryptographic_algorithm=cryptographic_parameters.get(
'cryptographic_algorithm'
),
random_iv=cryptographic_parameters.get('random_iv'),
iv_length=cryptographic_parameters.get('iv_length'),
tag_length=cryptographic_parameters.get('tag_length'),
fixed_field_length=cryptographic_parameters.get(
'fixed_field_length'
),
invocation_field_length=cryptographic_parameters.get(
'invocation_field_length'
),
counter_length=cryptographic_parameters.get('counter_length'),
initial_counter_value=cryptographic_parameters.get(
'initial_counter_value'
)
)
# Encrypt the provided data and handle the results
result = self.proxy.encrypt(
data,
uid,
cryptographic_parameters,
iv_counter_nonce
)
status = result.get('result_status')
if status == enums.ResultStatus.SUCCESS:
return result.get('data'), result.get('iv_counter_nonce')
else:
raise exceptions.KmipOperationFailure(
status,
result.get('result_reason'),
result.get('result_message')
)
def mac(self, data, uid=None, algorithm=None):
"""
Get the message authentication code for data.

View File

@ -53,6 +53,7 @@ from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import encrypt
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import get_attributes
from kmip.core.messages.payloads import get_attribute_list
@ -433,6 +434,78 @@ class KMIPProxy(KMIP):
results = self._process_batch_items(response)
return results[0]
def encrypt(self,
data,
unique_identifier=None,
cryptographic_parameters=None,
iv_counter_nonce=None,
credential=None):
"""
Encrypt data using the specified encryption key and parameters.
Args:
data (bytes): The bytes to encrypt. Required.
unique_identifier (string): The unique ID of the encryption key
to use. Optional, defaults to None.
cryptographic_parameters (CryptographicParameters): A structure
containing various cryptographic settings to be used for the
encryption. Optional, defaults to None.
iv_counter_nonce (bytes): The bytes to use for the IV/counter/
nonce, if needed by the encryption algorithm and/or cipher
mode. Optional, defaults to None.
credential (Credential): A credential object containing a set of
authorization parameters for the operation. Optional, defaults
to None.
Returns:
dict: The results of the encrypt operation, containing the
following key/value pairs:
Key | Value
--------------------|-----------------------------------------
'unique_identifier' | (string) The unique ID of the encryption
| key used to encrypt the data.
'data' | (bytes) The encrypted data.
'iv_counter_nonce' | (bytes) The IV/counter/nonce used for
| the encryption, if autogenerated.
'result_status' | (ResultStatus) An enumeration indicating
| the status of the operation result.
'result_reason' | (ResultReason) An enumeration providing
| context for the result status.
'result_message' | (string) A message providing additional
| context for the operation result.
"""
operation = Operation(OperationEnum.ENCRYPT)
request_payload = encrypt.EncryptRequestPayload(
unique_identifier=unique_identifier,
data=data,
cryptographic_parameters=cryptographic_parameters,
iv_counter_nonce=iv_counter_nonce
)
batch_item = messages.RequestBatchItem(
operation=operation,
request_payload=request_payload
)
request = self._build_request_message(credential, [batch_item])
response = self._send_and_receive_message(request)
batch_item = response.batch_items[0]
payload = batch_item.response_payload
result = {}
if payload:
result['unique_identifier'] = payload.unique_identifier
result['data'] = payload.data
result['iv_counter_nonce'] = payload.iv_counter_nonce
result['result_status'] = batch_item.result_status
result['result_reason'] = batch_item.result_reason
result['result_message'] = batch_item.result_message
return result
def mac(self, data, unique_identifier=None,
cryptographic_parameters=None, credential=None):
return self._mac(

View File

@ -59,6 +59,18 @@ class DummyKmipClient(api.KmipClient):
def destroy(self, uid):
super(DummyKmipClient, self).destroy(uid)
def encrypt(self,
data,
uid=None,
cryptographic_parameters=None,
iv_counter_nonce=None):
super(DummyKmipClient, self).encrypt(
data,
uid,
cryptographic_parameters,
iv_counter_nonce
)
def mac(self, data, uid, algorithm):
super(DummyKmipClient, self).mac(data, uid, algorithm)
@ -147,6 +159,14 @@ class TestKmipClient(testtools.TestCase):
dummy = DummyKmipClient()
dummy.destroy('uid')
def test_encrypt(self):
"""
Test that the encrypt method can be called without error.
:return:
"""
dummy = DummyKmipClient()
dummy.encrypt('data', 'uid', 'crypto_params', 'iv')
def test_mac(self):
"""
Test that the mac method can be called without error.

View File

@ -1318,6 +1318,175 @@ class TestProxyKmipClient(testtools.TestCase):
self.assertEqual(opn.attribute_name.value, 'Operation Policy Name')
self.assertEqual(opn.attribute_value.value, 'test')
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_encrypt(self):
"""
Test that the client can encrypt data.
"""
result = {
'data': (
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
),
'iv_counter_nonce': None,
'result_status': enums.ResultStatus.SUCCESS
}
client = ProxyKmipClient()
client.open()
client.proxy.encrypt.return_value = result
encrypted_data, iv_counter_nonce = client.encrypt(
(
b'\x37\x36\x35\x34\x33\x32\x31\x20'
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
b'\x66\x6F\x72\x20\x00'
),
uid='1',
cryptographic_parameters={
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.PKCS5,
'cryptographic_algorithm':
enums.CryptographicAlgorithm.BLOWFISH
},
iv_counter_nonce=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
)
self.assertEqual(result.get('data'), encrypted_data)
self.assertEqual(result.get('iv_counter_nonce'), iv_counter_nonce)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_encrypt_on_invalid_inputs(self):
"""
Test that TypeError exception are raised when trying to encrypt with
invalid parameters.
"""
client = ProxyKmipClient()
client.open()
client.proxy.encrypt.return_value = {}
args = [None]
kwargs = {
'uid': '1',
'cryptographic_parameters': {},
'iv_counter_nonce': b'\x00\x00\x00\x00'
}
self.assertRaisesRegexp(
TypeError,
"data must be bytes",
client.encrypt,
*args,
**kwargs
)
args = [b'\x01\x02\x03\x04']
kwargs = {
'uid': 1,
'cryptographic_parameters': {},
'iv_counter_nonce': b'\x00\x00\x00\x00'
}
self.assertRaisesRegexp(
TypeError,
"uid must be a string",
client.encrypt,
*args,
**kwargs
)
args = [b'\x01\x02\x03\x04']
kwargs = {
'uid': '1',
'cryptographic_parameters': 'invalid',
'iv_counter_nonce': b'\x00\x00\x00\x00'
}
self.assertRaisesRegexp(
TypeError,
"cryptographic_parameters must be a dict",
client.encrypt,
*args,
**kwargs
)
args = [b'\x01\x02\x03\x04']
kwargs = {
'uid': '1',
'cryptographic_parameters': {},
'iv_counter_nonce': {}
}
self.assertRaisesRegexp(
TypeError,
"iv_counter_nonce must be bytes",
client.encrypt,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_encrypt_on_closed(self):
"""
Test that a ClientConnectionNotOpen exception is raised when trying
to encrypt data on an unopened client connection.
"""
client = ProxyKmipClient()
args = [b'\x01\x02\x03\x04']
kwargs = {
'uid': '1',
'cryptographic_parameters': {},
'iv_counter_nonce': b'\x00\x00\x00\x00'
}
self.assertRaises(
ClientConnectionNotOpen,
client.encrypt,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_encrypt_on_operation_failure(self):
"""
Test that a KmipOperationFailure exception is raised when the
backend fails to encrypt data.
"""
status = enums.ResultStatus.OPERATION_FAILED
reason = enums.ResultReason.GENERAL_FAILURE
message = "Test failure message"
result = {
'result_status': status,
'result_reason': reason,
'result_message': message
}
error_message = str(KmipOperationFailure(status, reason, message))
client = ProxyKmipClient()
client.open()
client.proxy.encrypt.return_value = result
args = [b'\x01\x02\x03\x04']
kwargs = {
'uid': '1',
'cryptographic_parameters': {},
'iv_counter_nonce': b'\x00\x00\x00\x00'
}
self.assertRaisesRegexp(
KmipOperationFailure,
error_message,
client.encrypt,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_mac(self):

View File

@ -19,6 +19,7 @@ from kmip.core.attributes import PrivateKeyUniqueIdentifier
from kmip.core.attributes import CryptographicParameters
from kmip.core import enums
from kmip.core.enums import AuthenticationSuite
from kmip.core.enums import ConformanceClause
from kmip.core.enums import CredentialType
@ -45,6 +46,7 @@ from kmip.core.messages.payloads.create_key_pair import \
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
from kmip.core.messages.payloads.discover_versions import \
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
from kmip.core.messages.payloads import encrypt
from kmip.core.messages.payloads import get_attributes
from kmip.core.messages.payloads import get_attribute_list
from kmip.core.messages.payloads.query import \
@ -718,6 +720,69 @@ class TestKMIPClient(TestCase):
self.client._create_socket(sock)
self.assertEqual(ssl.SSLSocket, type(self.client.socket))
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._build_request_message'
)
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._send_and_receive_message'
)
def test_encrypt(self, send_mock, build_mock):
"""
Test that the client can encrypt data.
"""
payload = encrypt.EncryptResponsePayload(
unique_identifier='1',
data=(
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
)
)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.ENCRYPT),
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
response_payload=payload
)
response = ResponseMessage(batch_items=[batch_item])
build_mock.return_value = None
send_mock.return_value = response
result = self.client.encrypt(
(
b'\x37\x36\x35\x34\x33\x32\x31\x20'
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
b'\x66\x6F\x72\x20\x00'
),
unique_identifier='1',
cryptographic_parameters=CryptographicParameters(
block_cipher_mode=enums.BlockCipherMode.CBC,
padding_method=enums.PaddingMethod.PKCS5,
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
),
iv_counter_nonce=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
)
self.assertEqual('1', result.get('unique_identifier'))
self.assertEqual(
(
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
),
result.get('data')
)
self.assertEqual(None, result.get('iv_counter_nonce'))
self.assertEqual(
ResultStatusEnum.SUCCESS,
result.get('result_status').value
)
self.assertEqual(None, result.get('result_reason'))
self.assertEqual(None, result.get('result_message'))
@mock.patch('kmip.services.kmip_client.KMIPProxy._send_message',
mock.MagicMock())
@mock.patch('kmip.services.kmip_client.KMIPProxy._receive_message',