diff --git a/kmip/core/messages/payloads/get_attributes.py b/kmip/core/messages/payloads/get_attributes.py index 530f6f7..fd6e05d 100644 --- a/kmip/core/messages/payloads/get_attributes.py +++ b/kmip/core/messages/payloads/get_attributes.py @@ -31,19 +31,19 @@ class GetAttributesRequestPayload(primitives.Struct): duplicates in the attribute name list. Attributes: - uid: The unique ID of the managed object with which the retrieved - attributes should be associated. + unique_identifier: The unique ID of the managed object with which the + retrieved attributes should be associated. attribute_names: A list of strings identifying the names of the attributes associated with the managed object. """ - def __init__(self, uid=None, attribute_names=None): + def __init__(self, unique_identifier=None, attribute_names=None): """ Construct a GetAttributes request payload. Args: - uid (string): The ID of the managed object with which the - retrieved attributes should be associated. Optional, defaults - to None. + unique_identifier (string): The ID of the managed object with + which the retrieved attributes should be associated. Optional, + defaults to None. attribute_names: A list of strings identifying the names of the attributes associated with the managed object. Optional, defaults to None. @@ -51,30 +51,30 @@ class GetAttributesRequestPayload(primitives.Struct): super(GetAttributesRequestPayload, self).__init__( enums.Tags.REQUEST_PAYLOAD) - self._uid = None + self._unique_identifier = None self._attribute_names = list() - self.uid = uid + self.unique_identifier = unique_identifier self.attribute_names = attribute_names @property - def uid(self): - if self._uid: - return self._uid.value + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value else: - return self._uid + return self._unique_identifier - @uid.setter - def uid(self, value): + @unique_identifier.setter + def unique_identifier(self, value): if value is None: - self._uid = None + self._unique_identifier = None elif isinstance(value, six.string_types): - self._uid = primitives.TextString( + self._unique_identifier = primitives.TextString( value=value, tag=enums.Tags.UNIQUE_IDENTIFIER ) else: - raise TypeError("uid must be a string") + raise TypeError("unique identifier must be a string") @property def attribute_names(self): @@ -125,10 +125,12 @@ class GetAttributesRequestPayload(primitives.Struct): tstream = utils.BytearrayStream(istream.read(self.length)) if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): - self._uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER) - self._uid.read(tstream) + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(tstream) else: - self._uid = None + self._unique_identifier = None names = list() while self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream): @@ -150,8 +152,8 @@ class GetAttributesRequestPayload(primitives.Struct): """ tstream = utils.BytearrayStream() - if self._uid: - self._uid.write(tstream) + if self._unique_identifier: + self._unique_identifier.write(tstream) for attribute_name in self._attribute_names: attribute_name.write(tstream) @@ -161,22 +163,24 @@ class GetAttributesRequestPayload(primitives.Struct): ostream.write(tstream.buffer) def __repr__(self): - uid = "uid={0}".format(self.uid) + unique_identifier = "unique_identifier={0}".format( + self.unique_identifier + ) attribute_names = "attribute_names={0}".format(self.attribute_names) return "GetAttributesRequestPayload({0}, {1})".format( - uid, + unique_identifier, attribute_names ) def __str__(self): return str({ - 'uid': self.uid, + 'unique_identifier': self.unique_identifier, 'attribute_names': self.attribute_names }) def __eq__(self, other): if isinstance(other, GetAttributesRequestPayload): - if self.uid == other.uid: + if self.unique_identifier == other.unique_identifier: if set(self.attribute_names) == set(other.attribute_names): return True else: @@ -202,49 +206,49 @@ class GetAttributesResponsePayload(primitives.Struct): associated with the aforementioned managed object. Attributes: - uid: The unique ID of the managed object with which the retrieved - attributes should be associated. + unique_identifier: The unique ID of the managed object with which + the retrieved attributes should be associated. attributes: The list of attributes associated with managed object - identified by the uid above. + identified by the unique identifier above. """ - def __init__(self, uid=None, attributes=None): + def __init__(self, unique_identifier=None, attributes=None): """ Construct a GetAttributes response payload. Args: - uid (string): The ID of the managed object with which the - retrieved attributes should be associated. Optional, defaults - to None. + unique_identifier (string): The ID of the managed object with + which the retrieved attributes should be associated. Optional, + defaults to None. attributes (list): A list of attribute structures associated with the managed object. Optional, defaults to None. """ super(GetAttributesResponsePayload, self).__init__( enums.Tags.RESPONSE_PAYLOAD) - self._uid = None + self._unique_identifier = None self._attributes = list() - self.uid = uid + self.unique_identifier = unique_identifier self.attributes = attributes @property - def uid(self): - if self._uid: - return self._uid.value + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value else: - return self._uid + return self._unique_identifier - @uid.setter - def uid(self, value): + @unique_identifier.setter + def unique_identifier(self, value): if value is None: - self._uid = None + self._unique_identifier = None elif isinstance(value, six.string_types): - self._uid = primitives.TextString( + self._unique_identifier = primitives.TextString( value=value, tag=enums.Tags.UNIQUE_IDENTIFIER ) else: - raise TypeError("uid must be a string") + raise TypeError("unique identifier must be a string") @property def attributes(self): @@ -279,12 +283,14 @@ class GetAttributesResponsePayload(primitives.Struct): tstream = utils.BytearrayStream(istream.read(self.length)) if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): - uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER) - uid.read(tstream) - self.uid = uid.value + unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + unique_identifier.read(tstream) + self.unique_identifier = unique_identifier.value else: raise exceptions.InvalidKmipEncoding( - "expected GetAttributes response uid not found" + "expected GetAttributes response unique identifier not found" ) self._attributes = list() @@ -306,11 +312,11 @@ class GetAttributesResponsePayload(primitives.Struct): """ tstream = utils.BytearrayStream() - if self._uid: - self._uid.write(tstream) + if self._unique_identifier: + self._unique_identifier.write(tstream) else: raise exceptions.InvalidField( - "The GetAttributes response uid is required." + "The GetAttributes response unique identifier is required." ) for attribute in self._attributes: @@ -321,16 +327,24 @@ class GetAttributesResponsePayload(primitives.Struct): ostream.write(tstream.buffer) def __repr__(self): - uid = "uid={0}".format(self.uid) + unique_identifier = "unique_identifier={0}".format( + self.unique_identifier + ) names = "attributes={0}".format(self.attributes) - return "GetAttributesResponsePayload({0}, {1})".format(uid, names) + return "GetAttributesResponsePayload({0}, {1})".format( + unique_identifier, + names + ) def __str__(self): - return str({'uid': self.uid, 'attributes': self.attributes}) + return str({ + 'unique_identifier': self.unique_identifier, + 'attributes': self.attributes + }) def __eq__(self, other): if isinstance(other, GetAttributesResponsePayload): - if self.uid != other.uid: + if self.unique_identifier != other.unique_identifier: return False if len(self._attributes) != len(other._attributes): return False diff --git a/kmip/core/primitives.py b/kmip/core/primitives.py index a95f013..8dc7730 100644 --- a/kmip/core/primitives.py +++ b/kmip/core/primitives.py @@ -825,11 +825,10 @@ class TextString(Base): def __validate(self): if self.value is not None: - data_type = type(self.value) - if data_type is not str: + if not isinstance(self.value, six.string_types): msg = ErrorStrings.BAD_EXP_RECV raise TypeError(msg.format('TextString', 'value', str, - data_type)) + type(self.value))) def __repr__(self): return "{0}(value={1})".format(type(self).__name__, repr(self.value)) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 27e7341..f62d24a 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -29,6 +29,8 @@ import kmip from kmip.core import attributes from kmip.core import enums from kmip.core import exceptions + +from kmip.core.factories import attributes as attribute_factory from kmip.core.factories import secrets from kmip.core.messages import contents @@ -40,6 +42,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get +from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register @@ -492,7 +495,6 @@ class KmipEngine(object): unique_identifier ) ) - self._logger.exception(e) raise exceptions.ItemNotFound( "Could not locate object: {0}".format(unique_identifier) ) @@ -624,6 +626,148 @@ class KmipEngine(object): return attributes + def _get_attributes_from_managed_object(self, managed_object, attr_names): + """ + Given a kmip.pie object and a list of attribute names, attempt to get + all of the existing attribute values from the object. + """ + attr_factory = attribute_factory.AttributeFactory() + retrieved_attributes = list() + + if not attr_names: + attr_names = self._attribute_policy.get_all_attribute_names() + + for attribute_name in attr_names: + object_type = managed_object._object_type + + if not self._attribute_policy.is_attribute_supported( + attribute_name + ): + continue + + if self._attribute_policy.is_attribute_applicable_to_object_type( + attribute_name, + object_type + ): + attribute_value = self._get_attribute_from_managed_object( + managed_object, + attribute_name + ) + + if attribute_value is not None: + if self._attribute_policy.is_attribute_multivalued( + attribute_name + ): + for count, value in enumerate(attribute_value): + attribute = attr_factory.create_attribute( + enums.AttributeType(attribute_name), + value, + count + ) + retrieved_attributes.append(attribute) + else: + attribute = attr_factory.create_attribute( + enums.AttributeType(attribute_name), + attribute_value + ) + retrieved_attributes.append(attribute) + + return retrieved_attributes + + def _get_attribute_from_managed_object(self, managed_object, attr_name): + """ + Get the attribute value from the kmip.pie managed object. + """ + if attr_name == 'Unique Identifier': + return str(managed_object.unique_identifier) + elif attr_name == 'Name': + names = list() + for name in managed_object.names: + name = attributes.Name( + attributes.Name.NameValue(name), + attributes.Name.NameType( + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + names.append(name) + return names + elif attr_name == 'Object Type': + return managed_object._object_type + elif attr_name == 'Cryptographic Algorithm': + return managed_object.cryptographic_algorithm + elif attr_name == 'Cryptographic Length': + return managed_object.cryptographic_length + elif attr_name == 'Cryptographic Parameters': + return None + elif attr_name == 'Cryptographic Domain Parameters': + return None + elif attr_name == 'Certificate Type': + return managed_object.certificate_type + elif attr_name == 'Certificate Length': + return None + elif attr_name == 'X.509 Certificate Identifier': + return None + elif attr_name == 'X.509 Certificate Subject': + return None + elif attr_name == 'X.509 Certificate Issuer': + return None + elif attr_name == 'Certificate Identifier': + return None + elif attr_name == 'Certificate Subject': + return None + elif attr_name == 'Certificate Issuer': + return None + elif attr_name == 'Digital Signature Algorithm': + return None + elif attr_name == 'Digest': + return None + elif attr_name == 'Operation Policy Name': + return managed_object.operation_policy_name + elif attr_name == 'Cryptographic Usage Mask': + return managed_object.cryptographic_usage_masks + elif attr_name == 'Lease Time': + return None + elif attr_name == 'Usage Limits': + return None + elif attr_name == 'State': + return managed_object.state + elif attr_name == 'Initial Date': + return None + elif attr_name == 'Activation Date': + return None + elif attr_name == 'Process Start Date': + return None + elif attr_name == 'Protect Stop Date': + return None + elif attr_name == 'Deactivation Date': + return None + elif attr_name == 'Destroy Date': + return None + elif attr_name == 'Compromise Occurrence Date': + return None + elif attr_name == 'Compromise Date': + return None + elif attr_name == 'Revocation Reason': + return None + elif attr_name == 'Archive Date': + return None + elif attr_name == 'Object Group': + return None + elif attr_name == 'Fresh': + return None + elif attr_name == 'Link': + return None + elif attr_name == 'Application Specific Information': + return None + elif attr_name == 'Contact Information': + return None + elif attr_name == 'Last Change Date': + return None + else: + # Since custom attribute names are possible, just return None + # for unrecognized attributes. This satisfies the spec. + return None + def _set_attributes_on_managed_object(self, managed_object, attributes): """ Given a kmip.pie object and a dictionary of attributes, attempt to set @@ -761,6 +905,8 @@ class KmipEngine(object): return self._process_register(payload) elif operation == enums.Operation.GET: return self._process_get(payload) + elif operation == enums.Operation.GET_ATTRIBUTES: + return self._process_get_attributes(payload) elif operation == enums.Operation.DESTROY: return self._process_destroy(payload) elif operation == enums.Operation.QUERY: @@ -1187,6 +1333,47 @@ class KmipEngine(object): return response_payload + @_kmip_version_supported('1.0') + def _process_get_attributes(self, payload): + self._logger.info("Processing operation: GetAttributes") + + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + else: + unique_identifier = self._id_placeholder + + object_type = self._get_object_type(unique_identifier) + + managed_object = self._data_session.query(object_type).filter( + object_type.unique_identifier == unique_identifier + ).one() + + # Determine if the request should be carried out under the object's + # operation policy. If not, feign ignorance of the object. + is_allowed = self._is_allowed_by_operation_policy( + managed_object.operation_policy_name, + self._client_identity, + managed_object._owner, + managed_object._object_type, + enums.Operation.GET_ATTRIBUTES + ) + if not is_allowed: + raise exceptions.ItemNotFound( + "Could not locate object: {0}".format(unique_identifier) + ) + + attrs = self._get_attributes_from_managed_object( + managed_object, + payload.attribute_names + ) + + response_payload = get_attributes.GetAttributesResponsePayload( + unique_identifier=unique_identifier, + attributes=attrs + ) + + return response_payload + @_kmip_version_supported('1.0') def _process_activate(self, payload): self._logger.info("Processing operation: Activate") @@ -1308,6 +1495,7 @@ class KmipEngine(object): contents.Operation(enums.Operation.CREATE_KEY_PAIR), contents.Operation(enums.Operation.REGISTER), contents.Operation(enums.Operation.GET), + contents.Operation(enums.Operation.GET_ATTRIBUTES), contents.Operation(enums.Operation.DESTROY), contents.Operation(enums.Operation.QUERY) ]) diff --git a/kmip/services/server/policy.py b/kmip/services/server/policy.py index 5c40153..760081c 100644 --- a/kmip/services/server/policy.py +++ b/kmip/services/server/policy.py @@ -1146,3 +1146,12 @@ class AttributePolicy(object): # TODO (peterhamilton) Handle multivalue swap between certificate types rule_set = self._attribute_rule_sets.get(attribute) return rule_set.multiple_instances_permitted + + def get_all_attribute_names(self): + """ + Get a list of all supported attribute names. + + Returns: + list: A list of string attribute names. + """ + return self._attribute_rule_sets.keys() diff --git a/kmip/tests/unit/core/messages/payloads/test_get_attributes.py b/kmip/tests/unit/core/messages/payloads/test_get_attributes.py index 569d2d4..b5ac650 100644 --- a/kmip/tests/unit/core/messages/payloads/test_get_attributes.py +++ b/kmip/tests/unit/core/messages/payloads/test_get_attributes.py @@ -46,7 +46,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): b'\x69\x6F\x6E\x00\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x09' b'\x78\x2D\x50\x75\x72\x70\x6F\x73\x65\x00\x00\x00\x00\x00\x00\x00' ) - self.encoding_sans_uid = utils.BytearrayStream( + self.encoding_sans_unique_identifier = utils.BytearrayStream( b'\x42\x00\x79\x01\x00\x00\x00\x78\x42\x00\x0A\x07\x00\x00\x00\x0C' b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' b'\x42\x00\x0A\x07\x00\x00\x00\x20\x41\x70\x70\x6C\x69\x63\x61\x74' @@ -66,7 +66,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): b'\x42\x00\x79\x01\x00\x00\x00\x00' ) - self.uid = '1703250b-4d40-4de2-93a0-c494a1d4ae40' + self.unique_identifier = '1703250b-4d40-4de2-93a0-c494a1d4ae40' self.attribute_names = [ 'Object Group', 'Application Specific Information', @@ -90,41 +90,41 @@ class TestGetAttributesRequestPayload(testtools.TestCase): valid value. """ get_attributes.GetAttributesRequestPayload( - 'test-uid', + 'test-unique-identifier', ['test-attribute-name-1', 'test-attribute-name-2'] ) - def test_uid(self): + def test_unique_identifier(self): """ - Test that the uid attribute of a GetAttributes request payload can - be properly set and retrieved. + Test that the unique_identifier attribute of a GetAttributes request + payload can be properly set and retrieved. """ payload = get_attributes.GetAttributesRequestPayload() - self.assertIsNone(payload.uid) - self.assertIsNone(payload._uid) + self.assertIsNone(payload.unique_identifier) + self.assertIsNone(payload._unique_identifier) - payload.uid = 'test-uid' + payload.unique_identifier = 'test-unique-identifier' - self.assertEqual('test-uid', payload.uid) + self.assertEqual('test-unique-identifier', payload.unique_identifier) self.assertEqual( primitives.TextString( - value='test-uid', + value='test-unique-identifier', tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) - def test_uid_with_invalid_value(self): + def test_unique_identifier_with_invalid_value(self): """ Test that a TypeError is raised when an invalid ID is used to set - the uid attribute of a GetAttributes request payload. + the unique_identifier attribute of a GetAttributes request payload. """ payload = get_attributes.GetAttributesRequestPayload() - args = (payload, 'uid', 0) + args = (payload, 'unique_identifier', 0) self.assertRaisesRegexp( TypeError, - "uid must be a string", + "unique identifier must be a string", setattr, *args ) @@ -239,18 +239,18 @@ class TestGetAttributesRequestPayload(testtools.TestCase): """ payload = get_attributes.GetAttributesRequestPayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attribute_names) payload.read(self.full_encoding) - self.assertEqual(self.uid, payload.uid) + self.assertEqual(self.unique_identifier, payload.unique_identifier) self.assertEqual( primitives.TextString( - value=self.uid, + value=self.unique_identifier, tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) self.assertEqual( set(self.attribute_names), @@ -265,20 +265,20 @@ class TestGetAttributesRequestPayload(testtools.TestCase): payload._attribute_names ) - def test_read_with_no_uid(self): + def test_read_with_no_unique_identifier(self): """ Test that a GetAttributes request payload with no ID can be read from a data stream. """ payload = get_attributes.GetAttributesRequestPayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attribute_names) - payload.read(self.encoding_sans_uid) + payload.read(self.encoding_sans_unique_identifier) - self.assertEqual(None, payload.uid) - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload._unique_identifier) self.assertEqual( set(self.attribute_names), set(payload.attribute_names) @@ -299,18 +299,18 @@ class TestGetAttributesRequestPayload(testtools.TestCase): """ payload = get_attributes.GetAttributesRequestPayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attribute_names) payload.read(self.encoding_sans_attribute_names) - self.assertEqual(self.uid, payload.uid) + self.assertEqual(self.unique_identifier, payload.unique_identifier) self.assertEqual( primitives.TextString( - value=self.uid, + value=self.unique_identifier, tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) self.assertEqual(list(), payload.attribute_names) self.assertEqual(list(), payload._attribute_names) @@ -322,13 +322,13 @@ class TestGetAttributesRequestPayload(testtools.TestCase): """ payload = get_attributes.GetAttributesRequestPayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attribute_names) payload.read(self.empty_encoding) - self.assertEqual(None, payload.uid) - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload.attribute_names) self.assertEqual(list(), payload._attribute_names) @@ -338,7 +338,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): stream. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) stream = utils.BytearrayStream() @@ -347,7 +347,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): self.assertEqual(len(self.full_encoding), len(stream)) self.assertEqual(str(self.full_encoding), str(stream)) - def test_write_with_no_uid(self): + def test_write_with_no_unique_identifier(self): """ Test that a GetAttributes request payload with no ID can be written to a data stream. @@ -359,8 +359,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase): stream = utils.BytearrayStream() payload.write(stream) - self.assertEqual(len(self.encoding_sans_uid), len(stream)) - self.assertEqual(str(self.encoding_sans_uid), str(stream)) + self.assertEqual( + len(self.encoding_sans_unique_identifier), + len(stream) + ) + self.assertEqual( + str(self.encoding_sans_unique_identifier), + str(stream) + ) def test_write_with_no_attribute_names(self): """ @@ -368,7 +374,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): can be written to a data stream. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, None ) stream = utils.BytearrayStream() @@ -394,21 +400,23 @@ class TestGetAttributesRequestPayload(testtools.TestCase): Test that repr can be applied to a GetAttributes request payload. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) - uid = "uid={0}".format(payload.uid) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) attribute_names = "attribute_names={0}".format( payload.attribute_names ) expected = "GetAttributesRequestPayload({0}, {1})".format( - uid, + unique_identifier, attribute_names ) observed = repr(payload) self.assertEqual(expected, observed) - def test_repr_with_no_uid(self): + def test_repr_with_no_unique_identifier(self): """ Test that repr can be applied to a GetAttributes request payload with no ID. @@ -417,12 +425,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase): None, self.attribute_names ) - uid = "uid={0}".format(payload.uid) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) attribute_names = "attribute_names={0}".format( payload.attribute_names ) expected = "GetAttributesRequestPayload({0}, {1})".format( - uid, + unique_identifier, attribute_names ) observed = repr(payload) @@ -434,15 +444,17 @@ class TestGetAttributesRequestPayload(testtools.TestCase): no attribute names. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, None ) - uid = "uid={0}".format(payload.uid) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) attribute_names = "attribute_names={0}".format( payload.attribute_names ) expected = "GetAttributesRequestPayload({0}, {1})".format( - uid, + unique_identifier, attribute_names ) observed = repr(payload) @@ -457,12 +469,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase): None, None ) - uid = "uid={0}".format(payload.uid) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) attribute_names = "attribute_names={0}".format( payload.attribute_names ) expected = "GetAttributesRequestPayload({0}, {1})".format( - uid, + unique_identifier, attribute_names ) observed = repr(payload) @@ -473,11 +487,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): Test that str can be applied to a GetAttributes request payload. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) expected = str({ - 'uid': self.uid, + 'unique_identifier': self.unique_identifier, 'attribute_names': self.attribute_names }) observed = str(payload) @@ -493,7 +507,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): self.attribute_names ) expected = str({ - 'uid': None, + 'unique_identifier': None, 'attribute_names': self.attribute_names }) observed = str(payload) @@ -505,11 +519,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): no attribute names. """ payload = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, None ) expected = str({ - 'uid': self.uid, + 'unique_identifier': self.unique_identifier, 'attribute_names': list() }) observed = str(payload) @@ -525,7 +539,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): None ) expected = str({ - 'uid': None, + 'unique_identifier': None, 'attribute_names': list() }) observed = str(payload) @@ -537,11 +551,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): GetAttributes request payloads with the same data. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) @@ -555,25 +569,25 @@ class TestGetAttributesRequestPayload(testtools.TestCase): but with different attribute name orderings. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) self.attribute_names.reverse() b = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) self.assertTrue(a == b) self.assertTrue(b == a) - def test_equal_on_not_equal_uid(self): + def test_equal_on_not_equal_unique_identifier(self): """ Test that the equality operator returns False when comparing two GetAttributes request payloads with different IDs. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( @@ -590,11 +604,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): GetAttributes request payloads with different attribute names. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, None ) @@ -608,7 +622,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): payload. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = "invalid" @@ -622,24 +636,24 @@ class TestGetAttributesRequestPayload(testtools.TestCase): two GetAttributes request payloads with the same internal data. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) self.assertFalse(a != b) self.assertFalse(b != a) - def test_not_equal_on_not_equal_uid(self): + def test_not_equal_on_not_equal_unique_identifier(self): """ Test that the inequality operator returns True when comparing two GetAttributes request payloads with different IDs. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( @@ -656,11 +670,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): GetAttributes request payloads with different attribute names. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, None ) @@ -674,7 +688,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): payload. """ a = get_attributes.GetAttributesRequestPayload( - self.uid, + self.unique_identifier, self.attribute_names ) b = "invalid" @@ -715,7 +729,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): b'\x42\x00\x0B\x07\x00\x00\x00\x0D\x64\x65\x6D\x6F\x6E\x73\x74\x72' b'\x61\x74\x69\x6F\x6E\x00\x00\x00' ) - self.encoding_sans_uid = utils.BytearrayStream( + self.encoding_sans_unique_identifier = utils.BytearrayStream( b'\x42\x00\x7C\x01\x00\x00\x01\x00\x42\x00\x08\x01\x00\x00\x00\x28' b'\x42\x00\x0A\x07\x00\x00\x00\x0C\x4F\x62\x6A\x65\x63\x74\x20\x47' b'\x72\x6F\x75\x70\x00\x00\x00\x00\x42\x00\x0B\x07\x00\x00\x00\x06' @@ -741,7 +755,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): b'\x61\x65\x34\x30\x00\x00\x00\x00' ) - self.uid = '1703250b-4d40-4de2-93a0-c494a1d4ae40' + self.unique_identifier = '1703250b-4d40-4de2-93a0-c494a1d4ae40' self.attributes = [ objects.Attribute( attribute_name=objects.Attribute.AttributeName( @@ -785,41 +799,41 @@ class TestGetAttributesResponsePayload(testtools.TestCase): valid value. """ get_attributes.GetAttributesResponsePayload( - 'test-uid', + 'test-unique-identifier', [objects.Attribute(), objects.Attribute()] ) - def test_uid(self): + def test_unique_identifier(self): """ - Test that the uid attribute of a GetAttributes response payload can - be properly set and retrieved. + Test that the unique_identifier attribute of a GetAttributes response + payload can be properly set and retrieved. """ payload = get_attributes.GetAttributesResponsePayload() - self.assertIsNone(payload.uid) - self.assertIsNone(payload._uid) + self.assertIsNone(payload.unique_identifier) + self.assertIsNone(payload._unique_identifier) - payload.uid = 'test-uid' + payload.unique_identifier = 'test-unique-identifier' - self.assertEqual('test-uid', payload.uid) + self.assertEqual('test-unique-identifier', payload.unique_identifier) self.assertEqual( primitives.TextString( - value='test-uid', + value='test-unique-identifier', tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) - def test_uid_with_invalid_value(self): + def test_unique_identifier_with_invalid_value(self): """ Test that a TypeError is raised when an invalid ID is used to set - the uid attribute of a GetAttributes response payload. + the unique_identifier attribute of a GetAttributes response payload. """ payload = get_attributes.GetAttributesResponsePayload() - args = (payload, 'uid', 0) + args = (payload, 'unique_identifier', 0) self.assertRaisesRegexp( TypeError, - "uid must be a string", + "unique identifier must be a string", setattr, *args ) @@ -885,18 +899,18 @@ class TestGetAttributesResponsePayload(testtools.TestCase): """ payload = get_attributes.GetAttributesResponsePayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attributes) payload.read(self.full_encoding) - self.assertEqual(self.uid, payload.uid) + self.assertEqual(self.unique_identifier, payload.unique_identifier) self.assertEqual( primitives.TextString( - value=self.uid, + value=self.unique_identifier, tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) self.assertEqual( len(self.attributes), @@ -908,20 +922,20 @@ class TestGetAttributesResponsePayload(testtools.TestCase): payload._attributes ) - def test_read_with_no_uid(self): + def test_read_with_no_unique_identifier(self): """ Test that an InvalidKmipEncoding error gets raised when attempting to read a GetAttributes response encoding with no ID data. """ payload = get_attributes.GetAttributesResponsePayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attributes) - args = (self.encoding_sans_uid, ) + args = (self.encoding_sans_unique_identifier, ) self.assertRaisesRegexp( exceptions.InvalidKmipEncoding, - "expected GetAttributes response uid not found", + "expected GetAttributes response unique identifier not found", payload.read, *args ) @@ -933,18 +947,18 @@ class TestGetAttributesResponsePayload(testtools.TestCase): """ payload = get_attributes.GetAttributesResponsePayload() - self.assertEqual(None, payload._uid) + self.assertEqual(None, payload._unique_identifier) self.assertEqual(list(), payload._attributes) payload.read(self.encoding_sans_attributes) - self.assertEqual(self.uid, payload.uid) + self.assertEqual(self.unique_identifier, payload.unique_identifier) self.assertEqual( primitives.TextString( - value=self.uid, + value=self.unique_identifier, tag=enums.Tags.UNIQUE_IDENTIFIER ), - payload._uid + payload._unique_identifier ) self.assertEqual(list(), payload.attributes) self.assertEqual(list(), payload._attributes) @@ -955,7 +969,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): stream. """ payload = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) stream = utils.BytearrayStream() @@ -964,7 +978,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): self.assertEqual(len(self.full_encoding), len(stream)) self.assertEqual(str(self.full_encoding), str(stream)) - def test_write_with_no_uid(self): + def test_write_with_no_unique_identifier(self): """ Test that a GetAttributes request payload with no ID can be written to a data stream. @@ -978,7 +992,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): args = (stream, ) self.assertRaisesRegexp( exceptions.InvalidField, - "The GetAttributes response uid is required.", + "The GetAttributes response unique identifier is required.", payload.write, *args ) @@ -989,7 +1003,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): data can be written to a data stream. """ payload = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, None ) stream = utils.BytearrayStream() @@ -1003,15 +1017,17 @@ class TestGetAttributesResponsePayload(testtools.TestCase): Test that repr can be applied to a GetAttributes response payload. """ payload = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) - uid = "uid={0}".format(payload.uid) + unique_identifier = "unique_identifier={0}".format( + payload.unique_identifier + ) payload_attributes = "attributes={0}".format( payload.attributes ) expected = "GetAttributesResponsePayload({0}, {1})".format( - uid, + unique_identifier, payload_attributes ) observed = repr(payload) @@ -1022,11 +1038,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): Test that str can be applied to a GetAttributes response payload. """ payload = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) expected = str({ - 'uid': self.uid, + 'unique_identifier': self.unique_identifier, 'attributes': self.attributes }) observed = str(payload) @@ -1038,24 +1054,24 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with the same data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) self.assertTrue(a == b) self.assertTrue(b == a) - def test_equal_on_not_equal_uid(self): + def test_equal_on_not_equal_unique_identifier(self): """ Test that the equality operator returns False when comparing two GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( @@ -1072,13 +1088,13 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) reversed_attributes = copy.deepcopy(self.attributes) reversed_attributes.reverse() b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, reversed_attributes ) @@ -1091,11 +1107,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, list() ) @@ -1108,11 +1124,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, None ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) @@ -1126,7 +1142,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): payload. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = 'invalid' @@ -1140,24 +1156,24 @@ class TestGetAttributesResponsePayload(testtools.TestCase): two GetAttributes response payloads with the same internal data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) self.assertFalse(a != b) self.assertFalse(b != a) - def test_not_equal_on_not_equal_uid(self): + def test_not_equal_on_not_equal_unique_identifier(self): """ Test that the inequality operator returns True when comparing two GetAttributes request payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( @@ -1174,13 +1190,13 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) reversed_attributes = copy.deepcopy(self.attributes) reversed_attributes.reverse() b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, reversed_attributes ) @@ -1193,11 +1209,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, list() ) @@ -1210,11 +1226,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): GetAttributes response payloads with different data. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, None ) b = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) @@ -1228,7 +1244,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): payload. """ a = get_attributes.GetAttributesResponsePayload( - self.uid, + self.unique_identifier, self.attributes ) b = "invalid" diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 23f3624..0957eec 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -43,6 +43,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get +from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register @@ -859,6 +860,7 @@ class TestKmipEngine(testtools.TestCase): e._process_create_key_pair = mock.MagicMock() e._process_register = mock.MagicMock() e._process_get = mock.MagicMock() + e._process_get_attributes = mock.MagicMock() e._process_destroy = mock.MagicMock() e._process_query = mock.MagicMock() e._process_discover_versions = mock.MagicMock() @@ -867,6 +869,7 @@ class TestKmipEngine(testtools.TestCase): e._process_operation(enums.Operation.CREATE_KEY_PAIR, None) e._process_operation(enums.Operation.REGISTER, None) e._process_operation(enums.Operation.GET, None) + e._process_operation(enums.Operation.GET_ATTRIBUTES, None) e._process_operation(enums.Operation.DESTROY, None) e._process_operation(enums.Operation.QUERY, None) e._process_operation(enums.Operation.DISCOVER_VERSIONS, None) @@ -875,6 +878,7 @@ class TestKmipEngine(testtools.TestCase): e._process_create_key_pair.assert_called_with(None) e._process_register.assert_called_with(None) e._process_get.assert_called_with(None) + e._process_get_attributes.assert_called_with(None) e._process_destroy.assert_called_with(None) e._process_query.assert_called_with(None) e._process_discover_versions.assert_called_with(None) @@ -945,7 +949,6 @@ class TestKmipEngine(testtools.TestCase): e._logger.warning.assert_called_once_with( "Could not identify object type for object: 1" ) - self.assertTrue(e._logger.exception.called) def test_get_object_type_multiple_objects(self): """ @@ -1352,6 +1355,345 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_get_attributes_from_managed_object(self): + """ + Test that multiple attributes can be retrieved from a given managed + object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + symmetric_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'', + masks=[enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + ) + symmetric_key.names = ['Name 1', 'Name 2'] + + e._data_session.add(symmetric_key) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + result = e._get_attributes_from_managed_object( + symmetric_key, + ['Unique Identifier', + 'Name', + 'Cryptographic Algorithm', + 'Cryptographic Length', + 'Cryptographic Usage Mask', + 'invalid'] + ) + attribute_factory = factory.AttributeFactory() + + self.assertEqual(6, len(result)) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.UNIQUE_IDENTIFIER, + '1' + ) + self.assertIn(attribute, result) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + self.assertIn(attribute, result) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 0 + ) + self.assertIn(attribute, result) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + ) + self.assertIn(attribute, result) + + def test_get_attribute_from_managed_object(self): + """ + Test that an attribute can be retrieved from a given managed object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + symmetric_key = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'', + masks=[enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT] + ) + certificate = pie_objects.X509Certificate( + b'' + ) + opaque_object = pie_objects.OpaqueObject( + b'', + enums.OpaqueDataType.NONE + ) + + e._data_session.add(symmetric_key) + e._data_session.add(certificate) + e._data_session.add(opaque_object) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Unique Identifier' + ) + self.assertEqual('1', result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Name' + ) + self.assertEqual( + [attributes.Name( + attributes.Name.NameValue('Symmetric Key'), + attributes.Name.NameType( + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + )], + result + ) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Object Type' + ) + self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Cryptographic Algorithm' + ) + self.assertEqual(enums.CryptographicAlgorithm.AES, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Cryptographic Length' + ) + self.assertEqual(0, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Cryptographic Parameters' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Cryptographic Domain Parameters' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Certificate Type' + ) + self.assertEqual(enums.CertificateTypeEnum.X_509, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Certificate Length' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'X.509 Certificate Identifier' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'X.509 Certificate Subject' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'X.509 Certificate Issuer' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Certificate Identifier' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Certificate Subject' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Certificate Issuer' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + certificate, + 'Digital Signature Algorithm' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + opaque_object, + 'Digest' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Operation Policy Name' + ) + self.assertEqual('default', result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Cryptographic Usage Mask' + ) + self.assertEqual( + [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT], + result + ) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Lease Time' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Usage Limits' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'State' + ) + self.assertEqual(enums.State.PRE_ACTIVE, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Initial Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Activation Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Process Start Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Protect Stop Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Deactivation Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Destroy Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Compromise Occurrence Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Compromise Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Revocation Reason' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Archive Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Object Group' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Fresh' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Link' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Application Specific Information' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Contact Information' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'Last Change Date' + ) + self.assertEqual(None, result) + + result = e._get_attribute_from_managed_object( + symmetric_key, + 'invalid' + ) + self.assertEqual(None, result) + def test_set_attributes_on_managed_object(self): """ Test that multiple attributes can be set on a given managed object. @@ -3254,6 +3596,178 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_get_attributes(self): + """ + Test that a GetAttributes request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + payload = get_attributes.GetAttributesRequestPayload( + unique_identifier='1', + attribute_names=['Object Type', 'Cryptographic Algorithm'] + ) + + response_payload = e._process_get_attributes(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: GetAttributes" + ) + self.assertEqual( + '1', + response_payload.unique_identifier + ) + self.assertEqual( + 2, + len(response_payload.attributes) + ) + + attribute_factory = factory.AttributeFactory() + + attribute = attribute_factory.create_attribute( + enums.AttributeType.OBJECT_TYPE, + enums.ObjectType.SYMMETRIC_KEY + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + self.assertIn(attribute, response_payload.attributes) + + def test_get_attributes_with_no_arguments(self): + """ + Test that a GetAttributes request with no arguments can be processed + correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + e._id_placeholder = '1' + + payload = get_attributes.GetAttributesRequestPayload() + + response_payload = e._process_get_attributes(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: GetAttributes" + ) + self.assertEqual( + '1', + response_payload.unique_identifier + ) + self.assertEqual( + 8, + len(response_payload.attributes) + ) + + attribute_factory = factory.AttributeFactory() + + attribute = attribute_factory.create_attribute( + enums.AttributeType.OBJECT_TYPE, + enums.ObjectType.SYMMETRIC_KEY + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 0 + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.OPERATION_POLICY_NAME, + 'default' + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [] + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.STATE, + enums.State.PRE_ACTIVE + ) + self.assertIn(attribute, response_payload.attributes) + + attribute = attribute_factory.create_attribute( + enums.AttributeType.UNIQUE_IDENTIFIER, + '1' + ) + self.assertIn(attribute, response_payload.attributes) + + def test_get_attributes_not_allowed_by_policy(self): + """ + Test that an unallowed request is handled correctly by GetAttributes. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + e._client_identity = 'test' + + obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + obj_a._owner = 'admin' + + e._data_session.add(obj_a) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + payload = get_attributes.GetAttributesRequestPayload( + unique_identifier=id_a + ) + + # Test by specifying the ID of the object whose attributes should + # be retrieved. + args = [payload] + self.assertRaisesRegex( + exceptions.ItemNotFound, + "Could not locate object: {0}".format(id_a), + e._process_get_attributes, + *args + ) + def test_activate(self): """ Test that an Activate request can be processed correctly. @@ -3565,7 +4079,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(6, len(result.operations)) + self.assertEqual(7, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -3583,13 +4097,17 @@ class TestKmipEngine(testtools.TestCase): result.operations[3].value ) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.GET_ATTRIBUTES, result.operations[4].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[5].value ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[6].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -3608,7 +4126,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsNotNone(result.operations) - self.assertEqual(7, len(result.operations)) + self.assertEqual(8, len(result.operations)) self.assertEqual( enums.Operation.DISCOVER_VERSIONS, result.operations[-1].value diff --git a/kmip/tests/unit/services/server/test_policy.py b/kmip/tests/unit/services/server/test_policy.py index 542d99d..42b902e 100644 --- a/kmip/tests/unit/services/server/test_policy.py +++ b/kmip/tests/unit/services/server/test_policy.py @@ -112,3 +112,57 @@ class TestAttributePolicy(testtools.TestCase): result = rules.is_attribute_multivalued(attribute_b) self.assertTrue(result) + + def test_get_all_attribute_names(self): + """ + Test that get_all_attribute_names returns a complete list of the + names of all spec-defined attributes. + """ + rules = policy.AttributePolicy(contents.ProtocolVersion.create(1, 0)) + attribute_names = [ + 'Unique Identifier', + 'Name', + 'Object Type', + 'Cryptographic Algorithm', + 'Cryptographic Length', + 'Cryptographic Parameters', + 'Cryptographic Domain Parameters', + 'Certificate Type', + 'Certificate Length', + 'X.509 Certificate Identifier', + 'X.509 Certificate Subject', + 'X.509 Certificate Issuer', + 'Certificate Identifier', + 'Certificate Subject', + 'Certificate Issuer', + 'Digital Signature Algorithm', + 'Digest', + 'Operation Policy Name', + 'Cryptographic Usage Mask', + 'Lease Time', + 'Usage Limits', + 'State', + 'Initial Date', + 'Activation Date', + 'Process Start Date', + 'Protect Stop Date', + 'Deactivation Date', + 'Destroy Date', + 'Compromise Occurrence Date', + 'Compromise Date', + 'Revocation Reason', + 'Archive Date', + 'Object Group', + 'Fresh', + 'Link', + 'Application Specific Information', + 'Contact Information', + 'Last Change Date', + 'Custom Attribute' + ] + + result = rules.get_all_attribute_names() + + self.assertEqual(len(attribute_names), len(result)) + for attribute_name in attribute_names: + self.assertIn(attribute_name, result)