From c59cfeeef980e7fe560efeb18d730e49510de3e5 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Thu, 1 Dec 2016 18:21:07 -0500 Subject: [PATCH] Adding server support for the GetAttributes operation This change adds server support for the GetAttributes operation. The user can specify an object ID and an attribute list to get the values of the attributes listed from the specified object. The user can also omit either argument; the server will default to using the ID placeholder and all viable attributes respectively. Only a subset of the standard attributes are supported right now. New tests have been added to cover the new feature. Closes #185 Closes #211 Closes #218 --- kmip/core/messages/payloads/get_attributes.py | 124 ++-- kmip/core/primitives.py | 5 +- kmip/services/server/engine.py | 190 ++++++- kmip/services/server/policy.py | 9 + .../messages/payloads/test_get_attributes.py | 274 ++++----- .../tests/unit/services/server/test_engine.py | 528 +++++++++++++++++- .../tests/unit/services/server/test_policy.py | 54 ++ 7 files changed, 991 insertions(+), 193 deletions(-) diff --git a/kmip/core/messages/payloads/get_attributes.py b/kmip/core/messages/payloads/get_attributes.py index 530f6f7..fd6e05d 100644 --- a/kmip/core/messages/payloads/get_attributes.py +++ b/kmip/core/messages/payloads/get_attributes.py @@ -31,19 +31,19 @@ class GetAttributesRequestPayload(primitives.Struct): duplicates in the attribute name list. Attributes: - uid: The unique ID of the managed object with which the retrieved - attributes should be associated. + unique_identifier: The unique ID of the managed object with which the + retrieved attributes should be associated. attribute_names: A list of strings identifying the names of the attributes associated with the managed object. """ - def __init__(self, uid=None, attribute_names=None): + def __init__(self, unique_identifier=None, attribute_names=None): """ Construct a GetAttributes request payload. Args: - uid (string): The ID of the managed object with which the - retrieved attributes should be associated. Optional, defaults - to None. + unique_identifier (string): The ID of the managed object with + which the retrieved attributes should be associated. Optional, + defaults to None. attribute_names: A list of strings identifying the names of the attributes associated with the managed object. Optional, defaults to None. @@ -51,30 +51,30 @@ class GetAttributesRequestPayload(primitives.Struct): super(GetAttributesRequestPayload, self).__init__( enums.Tags.REQUEST_PAYLOAD) - self._uid = None + self._unique_identifier = None self._attribute_names = list() - self.uid = uid + self.unique_identifier = unique_identifier self.attribute_names = attribute_names @property - def uid(self): - if self._uid: - return self._uid.value + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value else: - return self._uid + return self._unique_identifier - @uid.setter - def uid(self, value): + @unique_identifier.setter + def unique_identifier(self, value): if value is None: - self._uid = None + self._unique_identifier = None elif isinstance(value, six.string_types): - self._uid = primitives.TextString( + self._unique_identifier = primitives.TextString( value=value, tag=enums.Tags.UNIQUE_IDENTIFIER ) else: - raise TypeError("uid must be a string") + raise TypeError("unique identifier must be a string") @property def attribute_names(self): @@ -125,10 +125,12 @@ class GetAttributesRequestPayload(primitives.Struct): tstream = utils.BytearrayStream(istream.read(self.length)) if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): - self._uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER) - self._uid.read(tstream) + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(tstream) else: - self._uid = None + self._unique_identifier = None names = list() while self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream): @@ -150,8 +152,8 @@ class GetAttributesRequestPayload(primitives.Struct): """ tstream = utils.BytearrayStream() - if self._uid: - self._uid.write(tstream) + if self._unique_identifier: + self._unique_identifier.write(tstream) for attribute_name in self._attribute_names: attribute_name.write(tstream) @@ -161,22 +163,24 @@ class GetAttributesRequestPayload(primitives.Struct): ostream.write(tstream.buffer) def __repr__(self): - uid = "uid={0}".format(self.uid) + unique_identifier = "unique_identifier={0}".format( + self.unique_identifier + ) attribute_names = "attribute_names={0}".format(self.attribute_names) return "GetAttributesRequestPayload({0}, {1})".format( - uid, + unique_identifier, attribute_names ) def __str__(self): return str({ - 'uid': self.uid, + 'unique_identifier': self.unique_identifier, 'attribute_names': self.attribute_names }) def __eq__(self, other): if isinstance(other, GetAttributesRequestPayload): - if self.uid == other.uid: + if self.unique_identifier == other.unique_identifier: if set(self.attribute_names) == set(other.attribute_names): return True else: @@ -202,49 +206,49 @@ class GetAttributesResponsePayload(primitives.Struct): associated with the aforementioned managed object. Attributes: - uid: The unique ID of the managed object with which the retrieved - attributes should be associated. + unique_identifier: The unique ID of the managed object with which + the retrieved attributes should be associated. attributes: The list of attributes associated with managed object - identified by the uid above. + identified by the unique identifier above. """ - def __init__(self, uid=None, attributes=None): + def __init__(self, unique_identifier=None, attributes=None): """ Construct a GetAttributes response payload. Args: - uid (string): The ID of the managed object with which the - retrieved attributes should be associated. Optional, defaults - to None. + unique_identifier (string): The ID of the managed object with + which the retrieved attributes should be associated. Optional, + defaults to None. attributes (list): A list of attribute structures associated with the managed object. Optional, defaults to None. """ super(GetAttributesResponsePayload, self).__init__( enums.Tags.RESPONSE_PAYLOAD) - self._uid = None + self._unique_identifier = None self._attributes = list() - self.uid = uid + self.unique_identifier = unique_identifier self.attributes = attributes @property - def uid(self): - if self._uid: - return self._uid.value + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value else: - return self._uid + return self._unique_identifier - @uid.setter - def uid(self, value): + @unique_identifier.setter + def unique_identifier(self, value): if value is None: - self._uid = None + self._unique_identifier = None elif isinstance(value, six.string_types): - self._uid = primitives.TextString( + self._unique_identifier = primitives.TextString( value=value, tag=enums.Tags.UNIQUE_IDENTIFIER ) else: - raise TypeError("uid must be a string") + raise TypeError("unique identifier must be a string") @property def attributes(self): @@ -279,12 +283,14 @@ class GetAttributesResponsePayload(primitives.Struct): tstream = utils.BytearrayStream(istream.read(self.length)) if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): - uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER) - uid.read(tstream) - self.uid = uid.value + unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + unique_identifier.read(tstream) + self.unique_identifier = unique_identifier.value else: raise exceptions.InvalidKmipEncoding( - "expected GetAttributes response uid not found" + "expected GetAttributes response unique identifier not found" ) self._attributes = list() @@ -306,11 +312,11 @@ class GetAttributesResponsePayload(primitives.Struct): """ tstream = utils.BytearrayStream() - if self._uid: - self._uid.write(tstream) + if self._unique_identifier: + self._unique_identifier.write(tstream) else: raise exceptions.InvalidField( - "The GetAttributes response uid is required." + "The GetAttributes response unique identifier is required." ) for attribute in self._attributes: @@ -321,16 +327,24 @@ class GetAttributesResponsePayload(primitives.Struct): ostream.write(tstream.buffer) def __repr__(self): - uid = "uid={0}".format(self.uid) + unique_identifier = "unique_identifier={0}".format( + self.unique_identifier + ) names = "attributes={0}".format(self.attributes) - return "GetAttributesResponsePayload({0}, {1})".format(uid, names) + return "GetAttributesResponsePayload({0}, {1})".format( + unique_identifier, + names + ) def __str__(self): - return str({'uid': self.uid, 'attributes': self.attributes}) + return str({ + 'unique_identifier': self.unique_identifier, + 'attributes': self.attributes + }) def __eq__(self, other): if isinstance(other, GetAttributesResponsePayload): - if self.uid != other.uid: + if self.unique_identifier != other.unique_identifier: return False if len(self._attributes) != len(other._attributes): return False diff --git a/kmip/core/primitives.py b/kmip/core/primitives.py index a95f013..8dc7730 100644 --- a/kmip/core/primitives.py +++ b/kmip/core/primitives.py @@ -825,11 +825,10 @@ class TextString(Base): def __validate(self): if self.value is not None: - data_type = type(self.value) - if data_type is not str: + if not isinstance(self.value, six.string_types): msg = ErrorStrings.BAD_EXP_RECV raise TypeError(msg.format('TextString', 'value', str, - data_type)) + type(self.value))) def __repr__(self): return "{0}(value={1})".format(type(self).__name__, repr(self.value)) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 27e7341..f62d24a 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -29,6 +29,8 @@ import kmip from kmip.core import attributes from kmip.core import enums from kmip.core import exceptions + +from kmip.core.factories import attributes as attribute_factory from kmip.core.factories import secrets from kmip.core.messages import contents @@ -40,6 +42,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get +from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register @@ -492,7 +495,6 @@ class KmipEngine(object): unique_identifier ) ) - self._logger.exception(e) raise exceptions.ItemNotFound( "Could not locate object: {0}".format(unique_identifier) ) @@ -624,6 +626,148 @@ class KmipEngine(object): return attributes + def _get_attributes_from_managed_object(self, managed_object, attr_names): + """ + Given a kmip.pie object and a list of attribute names, attempt to get + all of the existing attribute values from the object. + """ + attr_factory = attribute_factory.AttributeFactory() + retrieved_attributes = list() + + if not attr_names: + attr_names = self._attribute_policy.get_all_attribute_names() + + for attribute_name in attr_names: + object_type = managed_object._object_type + + if not self._attribute_policy.is_attribute_supported( + attribute_name + ): + continue + + if self._attribute_policy.is_attribute_applicable_to_object_type( + attribute_name, + object_type + ): + attribute_value = self._get_attribute_from_managed_object( + managed_object, + attribute_name + ) + + if attribute_value is not None: + if self._attribute_policy.is_attribute_multivalued( + attribute_name + ): + for count, value in enumerate(attribute_value): + attribute = attr_factory.create_attribute( + enums.AttributeType(attribute_name), + value, + count + ) + retrieved_attributes.append(attribute) + else: + attribute = attr_factory.create_attribute( + enums.AttributeType(attribute_name), + attribute_value + ) + retrieved_attributes.append(attribute) + + return retrieved_attributes + + def _get_attribute_from_managed_object(self, managed_object, attr_name): + """ + Get the attribute value from the kmip.pie managed object. + """ + if attr_name == 'Unique Identifier': + return str(managed_object.unique_identifier) + elif attr_name == 'Name': + names = list() + for name in managed_object.names: + name = attributes.Name( + attributes.Name.NameValue(name), + attributes.Name.NameType( + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + names.append(name) + return names + elif attr_name == 'Object Type': + return managed_object._object_type + elif attr_name == 'Cryptographic Algorithm': + return managed_object.cryptographic_algorithm + elif attr_name == 'Cryptographic Length': + return managed_object.cryptographic_length + elif attr_name == 'Cryptographic Parameters': + return None + elif attr_name == 'Cryptographic Domain Parameters': + return None + elif attr_name == 'Certificate Type': + return managed_object.certificate_type + elif attr_name == 'Certificate Length': + return None + elif attr_name == 'X.509 Certificate Identifier': + return None + elif attr_name == 'X.509 Certificate Subject': + return None + elif attr_name == 'X.509 Certificate Issuer': + return None + elif attr_name == 'Certificate Identifier': + return None + elif attr_name == 'Certificate Subject': + return None + elif attr_name == 'Certificate Issuer': + return None + elif attr_name == 'Digital Signature Algorithm': + return None + elif attr_name == 'Digest': + return None + elif attr_name == 'Operation Policy Name': + return managed_object.operation_policy_name + elif attr_name == 'Cryptographic Usage Mask': + return managed_object.cryptographic_usage_masks + elif attr_name == 'Lease Time': + return None + elif attr_name == 'Usage Limits': + return None + elif attr_name == 'State': + return managed_object.state + elif attr_name == 'Initial Date': + return None + elif attr_name == 'Activation Date': + return None + elif attr_name == 'Process Start Date': + return None + elif attr_name == 'Protect Stop Date': + return None + elif attr_name == 'Deactivation Date': + return None + elif attr_name == 'Destroy Date': + return None + elif attr_name == 'Compromise Occurrence Date': + return None + elif attr_name == 'Compromise Date': + return None + elif attr_name == 'Revocation Reason': + return None + elif attr_name == 'Archive Date': + return None + elif attr_name == 'Object Group': + return None + elif attr_name == 'Fresh': + return None + elif attr_name == 'Link': + return None + elif attr_name == 'Application Specific Information': + return None + elif attr_name == 'Contact Information': + return None + elif attr_name == 'Last Change Date': + return None + else: + # Since custom attribute names are possible, just return None + # for unrecognized attributes. This satisfies the spec. + return None + def _set_attributes_on_managed_object(self, managed_object, attributes): """ Given a kmip.pie object and a dictionary of attributes, attempt to set @@ -761,6 +905,8 @@ class KmipEngine(object): return self._process_register(payload) elif operation == enums.Operation.GET: return self._process_get(payload) + elif operation == enums.Operation.GET_ATTRIBUTES: + return self._process_get_attributes(payload) elif operation == enums.Operation.DESTROY: return self._process_destroy(payload) elif operation == enums.Operation.QUERY: @@ -1187,6 +1333,47 @@ class KmipEngine(object): return response_payload + @_kmip_version_supported('1.0') + def _process_get_attributes(self, payload): + self._logger.info("Processing operation: GetAttributes") + + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + else: + unique_identifier = self._id_placeholder + + object_type = self._get_object_type(unique_identifier) + + managed_object = self._data_session.query(object_type).filter( + object_type.unique_identifier == unique_identifier + ).one() + + # Determine if the request should be carried out under the object's + # operation policy. If not, feign ignorance of the object. + is_allowed = self._is_allowed_by_operation_policy( + managed_object.operation_policy_name, + self._client_identity, + managed_object._owner, + managed_object._object_type, + enums.Operation.GET_ATTRIBUTES + ) + if not is_allowed: + raise exceptions.ItemNotFound( + "Could not locate object: {0}".format(unique_identifier) + ) + + attrs = self._get_attributes_from_managed_object( + managed_object, + payload.attribute_names + ) + + response_payload = get_attributes.GetAttributesResponsePayload( + unique_identifier=unique_identifier, + attributes=attrs + ) + + return response_payload + @_kmip_version_supported('1.0') def _process_activate(self, payload): self._logger.info("Processing operation: Activate") @@ -1308,6 +1495,7 @@ class KmipEngine(object): contents.Operation(enums.Operation.CREATE_KEY_PAIR), contents.Operation(enums.Operation.REGISTER), contents.Operation(enums.Operation.GET), + contents.Operation(enums.Operation.GET_ATTRIBUTES), contents.Operation(enums.Operation.DESTROY), contents.Operation(enums.Operation.QUERY) ]) diff --git a/kmip/services/server/policy.py b/kmip/services/server/policy.py index 5c40153..760081c 100644 --- a/kmip/services/server/policy.py +++ b/kmip/services/server/policy.py @@ -1146,3 +1146,12 @@ class AttributePolicy(object): # TODO (peterhamilton) Handle multivalue swap between certificate types rule_set = self._attribute_rule_sets.get(attribute) return rule_set.multiple_instances_permitted + + def get_all_attribute_names(self): + """ + Get a list of all supported attribute names. + + Returns: + list: A list of string attribute names. + """ + return self._attribute_rule_sets.keys() diff --git a/kmip/tests/unit/core/messages/payloads/test_get_attributes.py b/kmip/tests/unit/core/messages/payloads/test_get_attributes.py index 569d2d4..b5ac650 100644 --- a/kmip/tests/unit/core/messages/payloads/test_get_attributes.py +++ b/kmip/tests/unit/core/messages/payloads/test_get_attributes.py @@ -46,7 +46,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): b'\x69\x6F\x6E\x00\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x09' b'\x78\x2D\x50\x75\x72\x70\x6F\x73\x65\x00\x00\x00\x00\x00\x00\x00' ) - self.encoding_sans_uid = utils.BytearrayStream( + self.encoding_sans_unique_identifier = utils.BytearrayStream( b'\x42\x00\x79\x01\x00\x00\x00\x78\x42\x00\x0A\x07\x00\x00\x00\x0C' b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' b'\x42\x00\x0A\x07\x00\x00\x00\x20\x41\x70\x70\x6C\x69\x63\x61\x74' @@ -66,7 +66,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): b'\x42\x00\x79\x01\x00\x00\x00\x00' ) - self.uid = '1703250b-4d40-4de2-93a0-c494a1d4ae40' + self.unique_identifier = '1703250b-4d40-4de2-93a0-c494a1d4ae40' self.attribute_names = [ 'Object Group', 'Application Specific Information', @@ -90,41 +90,41 @@ class TestGetAttributesRequestPayload(testtools.TestCase): valid value. """ get_attributes.GetAttributesRequestPayload( - 'test-uid', + 'test-unique-identifier', ['test-attribute-name-1', 'test-attribute-name-2'] ) - def test_uid(self): + def test_unique_identifier(self): """ - Test that the uid attribute of a GetAttributes request payload can - be properly set and retrieved. + Test that the unique_identifier attribute of a GetAttributes request + payload can be properly set and retrieved. """ payload = get_attributes.GetAttributesRequestPayload() - self.assertIsNone(payload.uid) - self.assertIsNone(payload._uid) + self.assertIsNone(payload.unique_identifier) + self.assertIsNone(payload._unique_identifier) - payload.uid = 'test-uid' + payload.unique_identifier = 'test-unique-identifier' - self.assertEqual('test-uid', payload.uid) + self.assertEqual('test-unique-identifier', payload.unique_identifier) self.assertEqual( primitives.TextString( - value='test-uid', + value='test-unique-identifier', tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) - def test_uid_with_invalid_value(self): + def test_unique_identifier_with_invalid_value(self): """ Test that a TypeError is raised when an invalid ID is used to set - the uid attribute of a GetAttributes request payload. + the unique_identifier attribute of a GetAttributes request payload. """ payload = get_attributes.GetAttributesRequestPayload() - args = (payload, 'uid', 0) + args = (payload, 'unique_identifier', 0) self.assertRaisesRegexp( TypeError, - "uid must be a string", + "unique identifier must be a string", setattr, *args ) @@ -239,18 +239,18 @@ class TestGetAttributesRequestPayload(testtools.TestCase): """ payload = get_attributes.GetAttributesRequestPayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attribute_names) payload.read(self.full_encoding) - self.assertEqual(self.uid, payload.uid) + self.assertEqual(self.unique_identifier, payload.unique_identifier) self.assertEqual( primitives.TextString( - value=self.uid, + value=self.unique_identifier, tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) self.assertEqual( set(self.attribute_names), @@ -265,20 +265,20 @@ class TestGetAttributesRequestPayload(testtools.TestCase): payload._attribute_names ) - def test_read_with_no_uid(self): + def test_read_with_no_unique_identifier(self): """ Test that a GetAttributes request payload with no ID can be read from a data stream. """ payload = get_attributes.GetAttributesRequestPayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attribute_names) - payload.read(self.encoding_sans_uid) + payload.read(self.encoding_sans_unique_identifier) - self.assertEqual(None, payload.uid) - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload._unique_identifier) self.assertEqual( set(self.attribute_names), set(payload.attribute_names) @@ -299,18 +299,18 @@ class TestGetAttributesRequestPayload(testtools.TestCase): """ payload = get_attributes.GetAttributesRequestPayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attribute_names) payload.read(self.encoding_sans_attribute_names) - self.assertEqual(self.uid, payload.uid) + self.assertEqual(self.unique_identifier, payload.unique_identifier) self.assertEqual( primitives.TextString( - value=self.uid, + value=self.unique_identifier, tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) self.assertEqual(list(), payload.attribute_names) self.assertEqual(list(), payload._attribute_names) @@ -322,13 +322,13 @@ class TestGetAttributesRequestPayload(testtools.TestCase): """ payload = get_attributes.GetAttributesRequestPayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attribute_names) payload.read(self.empty_encoding) - self.assertEqual(None, payload.uid) - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload.attribute_names) self.assertEqual(list(), payload._attribute_names) @@ -338,7 +338,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): stream. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) stream = utils.BytearrayStream() @@ -347,7 +347,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): self.assertEqual(len(self.full_encoding), len(stream)) self.assertEqual(str(self.full_encoding), str(stream)) - def test_write_with_no_uid(self): + def test_write_with_no_unique_identifier(self): """ Test that a GetAttributes request payload with no ID can be written to a data stream. @@ -359,8 +359,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase): stream = utils.BytearrayStream() payload.write(stream) - self.assertEqual(len(self.encoding_sans_uid), len(stream)) - self.assertEqual(str(self.encoding_sans_uid), str(stream)) + self.assertEqual( + len(self.encoding_sans_unique_identifier), + len(stream) + ) + self.assertEqual( + str(self.encoding_sans_unique_identifier), + str(stream) + ) def test_write_with_no_attribute_names(self): """ @@ -368,7 +374,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): can be written to a data stream. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, None ) stream = utils.BytearrayStream() @@ -394,21 +400,23 @@ class TestGetAttributesRequestPayload(testtools.TestCase): Test that repr can be applied to a GetAttributes request payload. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) - uid = "uid={0}".format(payload.uid) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) attribute_names = "attribute_names={0}".format( payload.attribute_names ) expected = "GetAttributesRequestPayload({0}, {1})".format( - uid, + unique_identifier, attribute_names ) observed = repr(payload) self.assertEqual(expected, observed) - def test_repr_with_no_uid(self): + def test_repr_with_no_unique_identifier(self): """ Test that repr can be applied to a GetAttributes request payload with no ID. @@ -417,12 +425,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase): None, self.attribute_names ) - uid = "uid={0}".format(payload.uid) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) attribute_names = "attribute_names={0}".format( payload.attribute_names ) expected = "GetAttributesRequestPayload({0}, {1})".format( - uid, + unique_identifier, attribute_names ) observed = repr(payload) @@ -434,15 +444,17 @@ class TestGetAttributesRequestPayload(testtools.TestCase): no attribute names. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, None ) - uid = "uid={0}".format(payload.uid) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) attribute_names = "attribute_names={0}".format( payload.attribute_names ) expected = "GetAttributesRequestPayload({0}, {1})".format( - uid, + unique_identifier, attribute_names ) observed = repr(payload) @@ -457,12 +469,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase): None, None ) - uid = "uid={0}".format(payload.uid) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) attribute_names = "attribute_names={0}".format( payload.attribute_names ) expected = "GetAttributesRequestPayload({0}, {1})".format( - uid, + unique_identifier, attribute_names ) observed = repr(payload) @@ -473,11 +487,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): Test that str can be applied to a GetAttributes request payload. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) expected = str({ - 'uid': self.uid, + 'unique_identifier': self.unique_identifier, 'attribute_names': self.attribute_names }) observed = str(payload) @@ -493,7 +507,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): self.attribute_names ) expected = str({ - 'uid': None, + 'unique_identifier': None, 'attribute_names': self.attribute_names }) observed = str(payload) @@ -505,11 +519,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): no attribute names. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, None ) expected = str({ - 'uid': self.uid, + 'unique_identifier': self.unique_identifier, 'attribute_names': list() }) observed = str(payload) @@ -525,7 +539,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): None ) expected = str({ - 'uid': None, + 'unique_identifier': None, 'attribute_names': list() }) observed = str(payload) @@ -537,11 +551,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): GetAttributes request payloads with the same data. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) @@ -555,25 +569,25 @@ class TestGetAttributesRequestPayload(testtools.TestCase): but with different attribute name orderings. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) self.attribute_names.reverse() b = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) self.assertTrue(a == b) self.assertTrue(b == a) - def test_equal_on_not_equal_uid(self): + def test_equal_on_not_equal_unique_identifier(self): """ Test that the equality operator returns False when comparing two GetAttributes request payloads with different IDs. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( @@ -590,11 +604,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): GetAttributes request payloads with different attribute names. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, None ) @@ -608,7 +622,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): payload. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = "invalid" @@ -622,24 +636,24 @@ class TestGetAttributesRequestPayload(testtools.TestCase): two GetAttributes request payloads with the same internal data. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) self.assertFalse(a != b) self.assertFalse(b != a) - def test_not_equal_on_not_equal_uid(self): + def test_not_equal_on_not_equal_unique_identifier(self): """ Test that the inequality operator returns True when comparing two GetAttributes request payloads with different IDs. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( @@ -656,11 +670,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): GetAttributes request payloads with different attribute names. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, None ) @@ -674,7 +688,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): payload. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = "invalid" @@ -715,7 +729,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): b'\x42\x00\x0B\x07\x00\x00\x00\x0D\x64\x65\x6D\x6F\x6E\x73\x74\x72' b'\x61\x74\x69\x6F\x6E\x00\x00\x00' ) - self.encoding_sans_uid = utils.BytearrayStream( + self.encoding_sans_unique_identifier = utils.BytearrayStream( b'\x42\x00\x7C\x01\x00\x00\x01\x00\x42\x00\x08\x01\x00\x00\x00\x28' b'\x42\x00\x0A\x07\x00\x00\x00\x0C\x4F\x62\x6A\x65\x63\x74\x20\x47' b'\x72\x6F\x75\x70\x00\x00\x00\x00\x42\x00\x0B\x07\x00\x00\x00\x06' @@ -741,7 +755,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): b'\x61\x65\x34\x30\x00\x00\x00\x00' ) - self.uid = '1703250b-4d40-4de2-93a0-c494a1d4ae40' + self.unique_identifier = '1703250b-4d40-4de2-93a0-c494a1d4ae40' self.attributes = [ objects.Attribute( attribute_name=objects.Attribute.AttributeName( @@ -785,41 +799,41 @@ class TestGetAttributesResponsePayload(testtools.TestCase): valid value. """ get_attributes.GetAttributesResponsePayload( - 'test-uid', + 'test-unique-identifier', [objects.Attribute(), objects.Attribute()] ) - def test_uid(self): + def test_unique_identifier(self): """ - Test that the uid attribute of a GetAttributes response payload can - be properly set and retrieved. + Test that the unique_identifier attribute of a GetAttributes response + payload can be properly set and retrieved. """ payload = get_attributes.GetAttributesResponsePayload() - self.assertIsNone(payload.uid) - self.assertIsNone(payload._uid) + self.assertIsNone(payload.unique_identifier) + self.assertIsNone(payload._unique_identifier) - payload.uid = 'test-uid' + payload.unique_identifier = 'test-unique-identifier' - self.assertEqual('test-uid', payload.uid) + self.assertEqual('test-unique-identifier', payload.unique_identifier) self.assertEqual( primitives.TextString( - value='test-uid', + value='test-unique-identifier', tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) - def test_uid_with_invalid_value(self): + def test_unique_identifier_with_invalid_value(self): """ Test that a TypeError is raised when an invalid ID is used to set - the uid attribute of a GetAttributes response payload. + the unique_identifier attribute of a GetAttributes response payload. """ payload = get_attributes.GetAttributesResponsePayload() - args = (payload, 'uid', 0) + args = (payload, 'unique_identifier', 0) self.assertRaisesRegexp( TypeError, - "uid must be a string", + "unique identifier must be a string", setattr, *args ) @@ -885,18 +899,18 @@ class TestGetAttributesResponsePayload(testtools.TestCase): """ payload = get_attributes.GetAttributesResponsePayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attributes) payload.read(self.full_encoding) - self.assertEqual(self.uid, payload.uid) + self.assertEqual(self.unique_identifier, payload.unique_identifier) self.assertEqual( primitives.TextString( - value=self.uid, + value=self.unique_identifier, tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) self.assertEqual( len(self.attributes), @@ -908,20 +922,20 @@ class TestGetAttributesResponsePayload(testtools.TestCase): payload._attributes ) - def test_read_with_no_uid(self): + def test_read_with_no_unique_identifier(self): """ Test that an InvalidKmipEncoding error gets raised when attempting to read a GetAttributes response encoding with no ID data. """ payload = get_attributes.GetAttributesResponsePayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attributes) - args = (self.encoding_sans_uid, ) + args = (self.encoding_sans_unique_identifier, ) self.assertRaisesRegexp( exceptions.InvalidKmipEncoding, - "expected GetAttributes response uid not found", + "expected GetAttributes response unique identifier not found", payload.read, *args ) @@ -933,18 +947,18 @@ class TestGetAttributesResponsePayload(testtools.TestCase): """ payload = get_attributes.GetAttributesResponsePayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attributes) payload.read(self.encoding_sans_attributes) - self.assertEqual(self.uid, payload.uid) + self.assertEqual(self.unique_identifier, payload.unique_identifier) self.assertEqual( primitives.TextString( - value=self.uid, + value=self.unique_identifier, tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) self.assertEqual(list(), payload.attributes) self.assertEqual(list(), payload._attributes) @@ -955,7 +969,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): stream. """ payload = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) stream = utils.BytearrayStream() @@ -964,7 +978,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): self.assertEqual(len(self.full_encoding), len(stream)) self.assertEqual(str(self.full_encoding), str(stream)) - def test_write_with_no_uid(self): + def test_write_with_no_unique_identifier(self): """ Test that a GetAttributes request payload with no ID can be written to a data stream. @@ -978,7 +992,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): args = (stream, ) self.assertRaisesRegexp( exceptions.InvalidField, - "The GetAttributes response uid is required.", + "The GetAttributes response unique identifier is required.", payload.write, *args ) @@ -989,7 +1003,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): data can be written to a data stream. """ payload = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, None ) stream = utils.BytearrayStream() @@ -1003,15 +1017,17 @@ class TestGetAttributesResponsePayload(testtools.TestCase): Test that repr can be applied to a GetAttributes response payload. """ payload = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) - uid = "uid={0}".format(payload.uid) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) payload_attributes = "attributes={0}".format( payload.attributes ) expected = "GetAttributesResponsePayload({0}, {1})".format( - uid, + unique_identifier, payload_attributes ) observed = repr(payload) @@ -1022,11 +1038,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): Test that str can be applied to a GetAttributes response payload. """ payload = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) expected = str({ - 'uid': self.uid, + 'unique_identifier': self.unique_identifier, 'attributes': self.attributes }) observed = str(payload) @@ -1038,24 +1054,24 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with the same data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) self.assertTrue(a == b) self.assertTrue(b == a) - def test_equal_on_not_equal_uid(self): + def test_equal_on_not_equal_unique_identifier(self): """ Test that the equality operator returns False when comparing two GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( @@ -1072,13 +1088,13 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) reversed_attributes = copy.deepcopy(self.attributes) reversed_attributes.reverse() b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, reversed_attributes ) @@ -1091,11 +1107,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, list() ) @@ -1108,11 +1124,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, None ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) @@ -1126,7 +1142,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): payload. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = 'invalid' @@ -1140,24 +1156,24 @@ class TestGetAttributesResponsePayload(testtools.TestCase): two GetAttributes response payloads with the same internal data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) self.assertFalse(a != b) self.assertFalse(b != a) - def test_not_equal_on_not_equal_uid(self): + def test_not_equal_on_not_equal_unique_identifier(self): """ Test that the inequality operator returns True when comparing two GetAttributes request payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( @@ -1174,13 +1190,13 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) reversed_attributes = copy.deepcopy(self.attributes) reversed_attributes.reverse() b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, reversed_attributes ) @@ -1193,11 +1209,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, list() ) @@ -1210,11 +1226,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, None ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) @@ -1228,7 +1244,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): payload. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = "invalid" diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 23f3624..0957eec 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -43,6 +43,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get +from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register @@ -859,6 +860,7 @@ class TestKmipEngine(testtools.TestCase): e._process_create_key_pair = mock.MagicMock() e._process_register = mock.MagicMock() e._process_get = mock.MagicMock() + e._process_get_attributes = mock.MagicMock() e._process_destroy = mock.MagicMock() e._process_query = mock.MagicMock() e._process_discover_versions = mock.MagicMock() @@ -867,6 +869,7 @@ class TestKmipEngine(testtools.TestCase): e._process_operation(enums.Operation.CREATE_KEY_PAIR, None) e._process_operation(enums.Operation.REGISTER, None) e._process_operation(enums.Operation.GET, None) + e._process_operation(enums.Operation.GET_ATTRIBUTES, None) e._process_operation(enums.Operation.DESTROY, None) e._process_operation(enums.Operation.QUERY, None) e._process_operation(enums.Operation.DISCOVER_VERSIONS, None) @@ -875,6 +878,7 @@ class TestKmipEngine(testtools.TestCase): e._process_create_key_pair.assert_called_with(None) e._process_register.assert_called_with(None) e._process_get.assert_called_with(None) + e._process_get_attributes.assert_called_with(None) e._process_destroy.assert_called_with(None) e._process_query.assert_called_with(None) e._process_discover_versions.assert_called_with(None) @@ -945,7 +949,6 @@ class TestKmipEngine(testtools.TestCase): e._logger.warning.assert_called_once_with( "Could not identify object type for object: 1" ) - self.assertTrue(e._logger.exception.called) def test_get_object_type_multiple_objects(self): """ @@ -1352,6 +1355,345 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_get_attributes_from_managed_object(self): + """ + Test that multiple attributes can be retrieved from a given managed + object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + symmetric_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'', + masks=[enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + ) + symmetric_key.names = ['Name 1', 'Name 2'] + + e._data_session.add(symmetric_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + result = e._get_attributes_from_managed_object( + symmetric_key, + ['Unique Identifier', + 'Name', + 'Cryptographic Algorithm', + 'Cryptographic Length', + 'Cryptographic Usage Mask', + 'invalid'] + ) + attribute_factory = factory.AttributeFactory() + + self.assertEqual(6, len(result)) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.UNIQUE_IDENTIFIER, + '1' + ) + self.assertIn(attribute, result) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + self.assertIn(attribute, result) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 0 + ) + self.assertIn(attribute, result) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + ) + self.assertIn(attribute, result) + + def test_get_attribute_from_managed_object(self): + """ + Test that an attribute can be retrieved from a given managed object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + symmetric_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'', + masks=[enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + ) + certificate = pie_objects.X509Certificate( + b'' + ) + opaque_object = pie_objects.OpaqueObject( + b'', + enums.OpaqueDataType.NONE + ) + + e._data_session.add(symmetric_key) + e._data_session.add(certificate) + e._data_session.add(opaque_object) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Unique Identifier' + ) + self.assertEqual('1', result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Name' + ) + self.assertEqual( + [attributes.Name( + attributes.Name.NameValue('Symmetric Key'), + attributes.Name.NameType( + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + )], + result + ) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Object Type' + ) + self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Cryptographic Algorithm' + ) + self.assertEqual(enums.CryptographicAlgorithm.AES, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Cryptographic Length' + ) + self.assertEqual(0, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Cryptographic Parameters' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Cryptographic Domain Parameters' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Certificate Type' + ) + self.assertEqual(enums.CertificateTypeEnum.X_509, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Certificate Length' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'X.509 Certificate Identifier' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'X.509 Certificate Subject' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'X.509 Certificate Issuer' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Certificate Identifier' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Certificate Subject' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Certificate Issuer' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Digital Signature Algorithm' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + opaque_object, + 'Digest' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Operation Policy Name' + ) + self.assertEqual('default', result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Cryptographic Usage Mask' + ) + self.assertEqual( + [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT], + result + ) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Lease Time' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Usage Limits' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'State' + ) + self.assertEqual(enums.State.PRE_ACTIVE, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Initial Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Activation Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Process Start Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Protect Stop Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Deactivation Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Destroy Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Compromise Occurrence Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Compromise Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Revocation Reason' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Archive Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Object Group' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Fresh' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Link' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Application Specific Information' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Contact Information' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Last Change Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'invalid' + ) + self.assertEqual(None, result) + def test_set_attributes_on_managed_object(self): """ Test that multiple attributes can be set on a given managed object. @@ -3254,6 +3596,178 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_get_attributes(self): + """ + Test that a GetAttributes request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + payload = get_attributes.GetAttributesRequestPayload( + unique_identifier='1', + attribute_names=['Object Type', 'Cryptographic Algorithm'] + ) + + response_payload = e._process_get_attributes(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: GetAttributes" + ) + self.assertEqual( + '1', + response_payload.unique_identifier + ) + self.assertEqual( + 2, + len(response_payload.attributes) + ) + + attribute_factory = factory.AttributeFactory() + + attribute = attribute_factory.create_attribute( + enums.AttributeType.OBJECT_TYPE, + enums.ObjectType.SYMMETRIC_KEY + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + self.assertIn(attribute, response_payload.attributes) + + def test_get_attributes_with_no_arguments(self): + """ + Test that a GetAttributes request with no arguments can be processed + correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + e._id_placeholder = '1' + + payload = get_attributes.GetAttributesRequestPayload() + + response_payload = e._process_get_attributes(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: GetAttributes" + ) + self.assertEqual( + '1', + response_payload.unique_identifier + ) + self.assertEqual( + 8, + len(response_payload.attributes) + ) + + attribute_factory = factory.AttributeFactory() + + attribute = attribute_factory.create_attribute( + enums.AttributeType.OBJECT_TYPE, + enums.ObjectType.SYMMETRIC_KEY + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 0 + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.OPERATION_POLICY_NAME, + 'default' + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [] + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.STATE, + enums.State.PRE_ACTIVE + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.UNIQUE_IDENTIFIER, + '1' + ) + self.assertIn(attribute, response_payload.attributes) + + def test_get_attributes_not_allowed_by_policy(self): + """ + Test that an unallowed request is handled correctly by GetAttributes. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._client_identity = 'test' + + obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + obj_a._owner = 'admin' + + e._data_session.add(obj_a) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + payload = get_attributes.GetAttributesRequestPayload( + unique_identifier=id_a + ) + + # Test by specifying the ID of the object whose attributes should + # be retrieved. + args = [payload] + self.assertRaisesRegex( + exceptions.ItemNotFound, + "Could not locate object: {0}".format(id_a), + e._process_get_attributes, + *args + ) + def test_activate(self): """ Test that an Activate request can be processed correctly. @@ -3565,7 +4079,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(6, len(result.operations)) + self.assertEqual(7, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -3583,13 +4097,17 @@ class TestKmipEngine(testtools.TestCase): result.operations[3].value ) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.GET_ATTRIBUTES, result.operations[4].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[5].value ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[6].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -3608,7 +4126,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsNotNone(result.operations) - self.assertEqual(7, len(result.operations)) + self.assertEqual(8, len(result.operations)) self.assertEqual( enums.Operation.DISCOVER_VERSIONS, result.operations[-1].value diff --git a/kmip/tests/unit/services/server/test_policy.py b/kmip/tests/unit/services/server/test_policy.py index 542d99d..42b902e 100644 --- a/kmip/tests/unit/services/server/test_policy.py +++ b/kmip/tests/unit/services/server/test_policy.py @@ -112,3 +112,57 @@ class TestAttributePolicy(testtools.TestCase): result = rules.is_attribute_multivalued(attribute_b) self.assertTrue(result) + + def test_get_all_attribute_names(self): + """ + Test that get_all_attribute_names returns a complete list of the + names of all spec-defined attributes. + """ + rules = policy.AttributePolicy(contents.ProtocolVersion.create(1, 0)) + attribute_names = [ + 'Unique Identifier', + 'Name', + 'Object Type', + 'Cryptographic Algorithm', + 'Cryptographic Length', + 'Cryptographic Parameters', + 'Cryptographic Domain Parameters', + 'Certificate Type', + 'Certificate Length', + 'X.509 Certificate Identifier', + 'X.509 Certificate Subject', + 'X.509 Certificate Issuer', + 'Certificate Identifier', + 'Certificate Subject', + 'Certificate Issuer', + 'Digital Signature Algorithm', + 'Digest', + 'Operation Policy Name', + 'Cryptographic Usage Mask', + 'Lease Time', + 'Usage Limits', + 'State', + 'Initial Date', + 'Activation Date', + 'Process Start Date', + 'Protect Stop Date', + 'Deactivation Date', + 'Destroy Date', + 'Compromise Occurrence Date', + 'Compromise Date', + 'Revocation Reason', + 'Archive Date', + 'Object Group', + 'Fresh', + 'Link', + 'Application Specific Information', + 'Contact Information', + 'Last Change Date', + 'Custom Attribute' + ] + + result = rules.get_all_attribute_names() + + self.assertEqual(len(attribute_names), len(result)) + for attribute_name in attribute_names: + self.assertIn(attribute_name, result)