Merge pull request #221 from OpenKMIP/feat/add-get-attributes-to-server
Adding server support for the GetAttributes operation
This commit is contained in:
		| @@ -31,19 +31,19 @@ class GetAttributesRequestPayload(primitives.Struct): | ||||
|     duplicates in the attribute name list. | ||||
|  | ||||
|     Attributes: | ||||
|         uid: The unique ID of the managed object with which the retrieved | ||||
|             attributes should be associated. | ||||
|         unique_identifier: The unique ID of the managed object with which the | ||||
|             retrieved attributes should be associated. | ||||
|         attribute_names: A list of strings identifying the names of the | ||||
|             attributes associated with the managed object. | ||||
|     """ | ||||
|     def __init__(self, uid=None, attribute_names=None): | ||||
|     def __init__(self, unique_identifier=None, attribute_names=None): | ||||
|         """ | ||||
|         Construct a GetAttributes request payload. | ||||
|  | ||||
|         Args: | ||||
|             uid (string): The ID of the managed object with which the | ||||
|                 retrieved attributes should be associated. Optional, defaults | ||||
|                 to None. | ||||
|             unique_identifier (string): The ID of the managed object with | ||||
|                 which the retrieved attributes should be associated. Optional, | ||||
|                 defaults to None. | ||||
|             attribute_names: A list of strings identifying the names of the | ||||
|                 attributes associated with the managed object. Optional, | ||||
|                 defaults to None. | ||||
| @@ -51,30 +51,30 @@ class GetAttributesRequestPayload(primitives.Struct): | ||||
|         super(GetAttributesRequestPayload, self).__init__( | ||||
|             enums.Tags.REQUEST_PAYLOAD) | ||||
|  | ||||
|         self._uid = None | ||||
|         self._unique_identifier = None | ||||
|         self._attribute_names = list() | ||||
|  | ||||
|         self.uid = uid | ||||
|         self.unique_identifier = unique_identifier | ||||
|         self.attribute_names = attribute_names | ||||
|  | ||||
|     @property | ||||
|     def uid(self): | ||||
|         if self._uid: | ||||
|             return self._uid.value | ||||
|     def unique_identifier(self): | ||||
|         if self._unique_identifier: | ||||
|             return self._unique_identifier.value | ||||
|         else: | ||||
|             return self._uid | ||||
|             return self._unique_identifier | ||||
|  | ||||
|     @uid.setter | ||||
|     def uid(self, value): | ||||
|     @unique_identifier.setter | ||||
|     def unique_identifier(self, value): | ||||
|         if value is None: | ||||
|             self._uid = None | ||||
|             self._unique_identifier = None | ||||
|         elif isinstance(value, six.string_types): | ||||
|             self._uid = primitives.TextString( | ||||
|             self._unique_identifier = primitives.TextString( | ||||
|                 value=value, | ||||
|                 tag=enums.Tags.UNIQUE_IDENTIFIER | ||||
|             ) | ||||
|         else: | ||||
|             raise TypeError("uid must be a string") | ||||
|             raise TypeError("unique identifier must be a string") | ||||
|  | ||||
|     @property | ||||
|     def attribute_names(self): | ||||
| @@ -125,10 +125,12 @@ class GetAttributesRequestPayload(primitives.Struct): | ||||
|         tstream = utils.BytearrayStream(istream.read(self.length)) | ||||
|  | ||||
|         if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): | ||||
|             self._uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER) | ||||
|             self._uid.read(tstream) | ||||
|             self._unique_identifier = primitives.TextString( | ||||
|                 tag=enums.Tags.UNIQUE_IDENTIFIER | ||||
|             ) | ||||
|             self._unique_identifier.read(tstream) | ||||
|         else: | ||||
|             self._uid = None | ||||
|             self._unique_identifier = None | ||||
|  | ||||
|         names = list() | ||||
|         while self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream): | ||||
| @@ -150,8 +152,8 @@ class GetAttributesRequestPayload(primitives.Struct): | ||||
|         """ | ||||
|         tstream = utils.BytearrayStream() | ||||
|  | ||||
|         if self._uid: | ||||
|             self._uid.write(tstream) | ||||
|         if self._unique_identifier: | ||||
|             self._unique_identifier.write(tstream) | ||||
|  | ||||
|         for attribute_name in self._attribute_names: | ||||
|             attribute_name.write(tstream) | ||||
| @@ -161,22 +163,24 @@ class GetAttributesRequestPayload(primitives.Struct): | ||||
|         ostream.write(tstream.buffer) | ||||
|  | ||||
|     def __repr__(self): | ||||
|         uid = "uid={0}".format(self.uid) | ||||
|         unique_identifier = "unique_identifier={0}".format( | ||||
|             self.unique_identifier | ||||
|         ) | ||||
|         attribute_names = "attribute_names={0}".format(self.attribute_names) | ||||
|         return "GetAttributesRequestPayload({0}, {1})".format( | ||||
|             uid, | ||||
|             unique_identifier, | ||||
|             attribute_names | ||||
|         ) | ||||
|  | ||||
|     def __str__(self): | ||||
|         return str({ | ||||
|             'uid': self.uid, | ||||
|             'unique_identifier': self.unique_identifier, | ||||
|             'attribute_names': self.attribute_names | ||||
|         }) | ||||
|  | ||||
|     def __eq__(self, other): | ||||
|         if isinstance(other, GetAttributesRequestPayload): | ||||
|             if self.uid == other.uid: | ||||
|             if self.unique_identifier == other.unique_identifier: | ||||
|                 if set(self.attribute_names) == set(other.attribute_names): | ||||
|                     return True | ||||
|                 else: | ||||
| @@ -202,49 +206,49 @@ class GetAttributesResponsePayload(primitives.Struct): | ||||
|     associated with the aforementioned managed object. | ||||
|  | ||||
|     Attributes: | ||||
|         uid: The unique ID of the managed object with which the retrieved | ||||
|             attributes should be associated. | ||||
|         unique_identifier: The unique ID of the managed object with which | ||||
|             the retrieved attributes should be associated. | ||||
|         attributes: The list of attributes associated with managed object | ||||
|             identified by the uid above. | ||||
|             identified by the unique identifier above. | ||||
|     """ | ||||
|     def __init__(self, uid=None, attributes=None): | ||||
|     def __init__(self, unique_identifier=None, attributes=None): | ||||
|         """ | ||||
|         Construct a GetAttributes response payload. | ||||
|  | ||||
|         Args: | ||||
|             uid (string): The ID of the managed object with which the | ||||
|                 retrieved attributes should be associated. Optional, defaults | ||||
|                 to None. | ||||
|             unique_identifier (string): The ID of the managed object with | ||||
|                 which the retrieved attributes should be associated. Optional, | ||||
|                 defaults to None. | ||||
|             attributes (list): A list of attribute structures associated with | ||||
|                 the managed object. Optional, defaults to None. | ||||
|         """ | ||||
|         super(GetAttributesResponsePayload, self).__init__( | ||||
|             enums.Tags.RESPONSE_PAYLOAD) | ||||
|  | ||||
|         self._uid = None | ||||
|         self._unique_identifier = None | ||||
|         self._attributes = list() | ||||
|  | ||||
|         self.uid = uid | ||||
|         self.unique_identifier = unique_identifier | ||||
|         self.attributes = attributes | ||||
|  | ||||
|     @property | ||||
|     def uid(self): | ||||
|         if self._uid: | ||||
|             return self._uid.value | ||||
|     def unique_identifier(self): | ||||
|         if self._unique_identifier: | ||||
|             return self._unique_identifier.value | ||||
|         else: | ||||
|             return self._uid | ||||
|             return self._unique_identifier | ||||
|  | ||||
|     @uid.setter | ||||
|     def uid(self, value): | ||||
|     @unique_identifier.setter | ||||
|     def unique_identifier(self, value): | ||||
|         if value is None: | ||||
|             self._uid = None | ||||
|             self._unique_identifier = None | ||||
|         elif isinstance(value, six.string_types): | ||||
|             self._uid = primitives.TextString( | ||||
|             self._unique_identifier = primitives.TextString( | ||||
|                 value=value, | ||||
|                 tag=enums.Tags.UNIQUE_IDENTIFIER | ||||
|             ) | ||||
|         else: | ||||
|             raise TypeError("uid must be a string") | ||||
|             raise TypeError("unique identifier must be a string") | ||||
|  | ||||
|     @property | ||||
|     def attributes(self): | ||||
| @@ -279,12 +283,14 @@ class GetAttributesResponsePayload(primitives.Struct): | ||||
|         tstream = utils.BytearrayStream(istream.read(self.length)) | ||||
|  | ||||
|         if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): | ||||
|             uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER) | ||||
|             uid.read(tstream) | ||||
|             self.uid = uid.value | ||||
|             unique_identifier = primitives.TextString( | ||||
|                 tag=enums.Tags.UNIQUE_IDENTIFIER | ||||
|             ) | ||||
|             unique_identifier.read(tstream) | ||||
|             self.unique_identifier = unique_identifier.value | ||||
|         else: | ||||
|             raise exceptions.InvalidKmipEncoding( | ||||
|                 "expected GetAttributes response uid not found" | ||||
|                 "expected GetAttributes response unique identifier not found" | ||||
|             ) | ||||
|  | ||||
|         self._attributes = list() | ||||
| @@ -306,11 +312,11 @@ class GetAttributesResponsePayload(primitives.Struct): | ||||
|         """ | ||||
|         tstream = utils.BytearrayStream() | ||||
|  | ||||
|         if self._uid: | ||||
|             self._uid.write(tstream) | ||||
|         if self._unique_identifier: | ||||
|             self._unique_identifier.write(tstream) | ||||
|         else: | ||||
|             raise exceptions.InvalidField( | ||||
|                 "The GetAttributes response uid is required." | ||||
|                 "The GetAttributes response unique identifier is required." | ||||
|             ) | ||||
|  | ||||
|         for attribute in self._attributes: | ||||
| @@ -321,16 +327,24 @@ class GetAttributesResponsePayload(primitives.Struct): | ||||
|         ostream.write(tstream.buffer) | ||||
|  | ||||
|     def __repr__(self): | ||||
|         uid = "uid={0}".format(self.uid) | ||||
|         unique_identifier = "unique_identifier={0}".format( | ||||
|             self.unique_identifier | ||||
|         ) | ||||
|         names = "attributes={0}".format(self.attributes) | ||||
|         return "GetAttributesResponsePayload({0}, {1})".format(uid, names) | ||||
|         return "GetAttributesResponsePayload({0}, {1})".format( | ||||
|             unique_identifier, | ||||
|             names | ||||
|         ) | ||||
|  | ||||
|     def __str__(self): | ||||
|         return str({'uid': self.uid, 'attributes': self.attributes}) | ||||
|         return str({ | ||||
|             'unique_identifier': self.unique_identifier, | ||||
|             'attributes': self.attributes | ||||
|         }) | ||||
|  | ||||
|     def __eq__(self, other): | ||||
|         if isinstance(other, GetAttributesResponsePayload): | ||||
|             if self.uid != other.uid: | ||||
|             if self.unique_identifier != other.unique_identifier: | ||||
|                 return False | ||||
|             if len(self._attributes) != len(other._attributes): | ||||
|                 return False | ||||
|   | ||||
| @@ -825,11 +825,10 @@ class TextString(Base): | ||||
|  | ||||
|     def __validate(self): | ||||
|         if self.value is not None: | ||||
|             data_type = type(self.value) | ||||
|             if data_type is not str: | ||||
|             if not isinstance(self.value, six.string_types): | ||||
|                 msg = ErrorStrings.BAD_EXP_RECV | ||||
|                 raise TypeError(msg.format('TextString', 'value', str, | ||||
|                                            data_type)) | ||||
|                                            type(self.value))) | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return "{0}(value={1})".format(type(self).__name__, repr(self.value)) | ||||
|   | ||||
| @@ -29,6 +29,8 @@ import kmip | ||||
| from kmip.core import attributes | ||||
| from kmip.core import enums | ||||
| from kmip.core import exceptions | ||||
|  | ||||
| from kmip.core.factories import attributes as attribute_factory | ||||
| from kmip.core.factories import secrets | ||||
|  | ||||
| from kmip.core.messages import contents | ||||
| @@ -40,6 +42,7 @@ from kmip.core.messages.payloads import create_key_pair | ||||
| from kmip.core.messages.payloads import destroy | ||||
| from kmip.core.messages.payloads import discover_versions | ||||
| from kmip.core.messages.payloads import get | ||||
| from kmip.core.messages.payloads import get_attributes | ||||
| from kmip.core.messages.payloads import query | ||||
| from kmip.core.messages.payloads import register | ||||
|  | ||||
| @@ -492,7 +495,6 @@ class KmipEngine(object): | ||||
|                     unique_identifier | ||||
|                 ) | ||||
|             ) | ||||
|             self._logger.exception(e) | ||||
|             raise exceptions.ItemNotFound( | ||||
|                 "Could not locate object: {0}".format(unique_identifier) | ||||
|             ) | ||||
| @@ -624,6 +626,148 @@ class KmipEngine(object): | ||||
|  | ||||
|         return attributes | ||||
|  | ||||
|     def _get_attributes_from_managed_object(self, managed_object, attr_names): | ||||
|         """ | ||||
|         Given a kmip.pie object and a list of attribute names, attempt to get | ||||
|         all of the existing attribute values from the object. | ||||
|         """ | ||||
|         attr_factory = attribute_factory.AttributeFactory() | ||||
|         retrieved_attributes = list() | ||||
|  | ||||
|         if not attr_names: | ||||
|             attr_names = self._attribute_policy.get_all_attribute_names() | ||||
|  | ||||
|         for attribute_name in attr_names: | ||||
|             object_type = managed_object._object_type | ||||
|  | ||||
|             if not self._attribute_policy.is_attribute_supported( | ||||
|                     attribute_name | ||||
|             ): | ||||
|                 continue | ||||
|  | ||||
|             if self._attribute_policy.is_attribute_applicable_to_object_type( | ||||
|                 attribute_name, | ||||
|                 object_type | ||||
|             ): | ||||
|                 attribute_value = self._get_attribute_from_managed_object( | ||||
|                     managed_object, | ||||
|                     attribute_name | ||||
|                 ) | ||||
|  | ||||
|                 if attribute_value is not None: | ||||
|                     if self._attribute_policy.is_attribute_multivalued( | ||||
|                             attribute_name | ||||
|                     ): | ||||
|                         for count, value in enumerate(attribute_value): | ||||
|                             attribute = attr_factory.create_attribute( | ||||
|                                 enums.AttributeType(attribute_name), | ||||
|                                 value, | ||||
|                                 count | ||||
|                             ) | ||||
|                             retrieved_attributes.append(attribute) | ||||
|                     else: | ||||
|                         attribute = attr_factory.create_attribute( | ||||
|                             enums.AttributeType(attribute_name), | ||||
|                             attribute_value | ||||
|                         ) | ||||
|                         retrieved_attributes.append(attribute) | ||||
|  | ||||
|         return retrieved_attributes | ||||
|  | ||||
|     def _get_attribute_from_managed_object(self, managed_object, attr_name): | ||||
|         """ | ||||
|         Get the attribute value from the kmip.pie managed object. | ||||
|         """ | ||||
|         if attr_name == 'Unique Identifier': | ||||
|             return str(managed_object.unique_identifier) | ||||
|         elif attr_name == 'Name': | ||||
|             names = list() | ||||
|             for name in managed_object.names: | ||||
|                 name = attributes.Name( | ||||
|                     attributes.Name.NameValue(name), | ||||
|                     attributes.Name.NameType( | ||||
|                         enums.NameType.UNINTERPRETED_TEXT_STRING | ||||
|                     ) | ||||
|                 ) | ||||
|                 names.append(name) | ||||
|             return names | ||||
|         elif attr_name == 'Object Type': | ||||
|             return managed_object._object_type | ||||
|         elif attr_name == 'Cryptographic Algorithm': | ||||
|             return managed_object.cryptographic_algorithm | ||||
|         elif attr_name == 'Cryptographic Length': | ||||
|             return managed_object.cryptographic_length | ||||
|         elif attr_name == 'Cryptographic Parameters': | ||||
|             return None | ||||
|         elif attr_name == 'Cryptographic Domain Parameters': | ||||
|             return None | ||||
|         elif attr_name == 'Certificate Type': | ||||
|             return managed_object.certificate_type | ||||
|         elif attr_name == 'Certificate Length': | ||||
|             return None | ||||
|         elif attr_name == 'X.509 Certificate Identifier': | ||||
|             return None | ||||
|         elif attr_name == 'X.509 Certificate Subject': | ||||
|             return None | ||||
|         elif attr_name == 'X.509 Certificate Issuer': | ||||
|             return None | ||||
|         elif attr_name == 'Certificate Identifier': | ||||
|             return None | ||||
|         elif attr_name == 'Certificate Subject': | ||||
|             return None | ||||
|         elif attr_name == 'Certificate Issuer': | ||||
|             return None | ||||
|         elif attr_name == 'Digital Signature Algorithm': | ||||
|             return None | ||||
|         elif attr_name == 'Digest': | ||||
|             return None | ||||
|         elif attr_name == 'Operation Policy Name': | ||||
|             return managed_object.operation_policy_name | ||||
|         elif attr_name == 'Cryptographic Usage Mask': | ||||
|             return managed_object.cryptographic_usage_masks | ||||
|         elif attr_name == 'Lease Time': | ||||
|             return None | ||||
|         elif attr_name == 'Usage Limits': | ||||
|             return None | ||||
|         elif attr_name == 'State': | ||||
|             return managed_object.state | ||||
|         elif attr_name == 'Initial Date': | ||||
|             return None | ||||
|         elif attr_name == 'Activation Date': | ||||
|             return None | ||||
|         elif attr_name == 'Process Start Date': | ||||
|             return None | ||||
|         elif attr_name == 'Protect Stop Date': | ||||
|             return None | ||||
|         elif attr_name == 'Deactivation Date': | ||||
|             return None | ||||
|         elif attr_name == 'Destroy Date': | ||||
|             return None | ||||
|         elif attr_name == 'Compromise Occurrence Date': | ||||
|             return None | ||||
|         elif attr_name == 'Compromise Date': | ||||
|             return None | ||||
|         elif attr_name == 'Revocation Reason': | ||||
|             return None | ||||
|         elif attr_name == 'Archive Date': | ||||
|             return None | ||||
|         elif attr_name == 'Object Group': | ||||
|             return None | ||||
|         elif attr_name == 'Fresh': | ||||
|             return None | ||||
|         elif attr_name == 'Link': | ||||
|             return None | ||||
|         elif attr_name == 'Application Specific Information': | ||||
|             return None | ||||
|         elif attr_name == 'Contact Information': | ||||
|             return None | ||||
|         elif attr_name == 'Last Change Date': | ||||
|             return None | ||||
|         else: | ||||
|             # Since custom attribute names are possible, just return None | ||||
|             # for unrecognized attributes. This satisfies the spec. | ||||
|             return None | ||||
|  | ||||
|     def _set_attributes_on_managed_object(self, managed_object, attributes): | ||||
|         """ | ||||
|         Given a kmip.pie object and a dictionary of attributes, attempt to set | ||||
| @@ -761,6 +905,8 @@ class KmipEngine(object): | ||||
|             return self._process_register(payload) | ||||
|         elif operation == enums.Operation.GET: | ||||
|             return self._process_get(payload) | ||||
|         elif operation == enums.Operation.GET_ATTRIBUTES: | ||||
|             return self._process_get_attributes(payload) | ||||
|         elif operation == enums.Operation.DESTROY: | ||||
|             return self._process_destroy(payload) | ||||
|         elif operation == enums.Operation.QUERY: | ||||
| @@ -1187,6 +1333,47 @@ class KmipEngine(object): | ||||
|  | ||||
|         return response_payload | ||||
|  | ||||
|     @_kmip_version_supported('1.0') | ||||
|     def _process_get_attributes(self, payload): | ||||
|         self._logger.info("Processing operation: GetAttributes") | ||||
|  | ||||
|         if payload.unique_identifier: | ||||
|             unique_identifier = payload.unique_identifier | ||||
|         else: | ||||
|             unique_identifier = self._id_placeholder | ||||
|  | ||||
|         object_type = self._get_object_type(unique_identifier) | ||||
|  | ||||
|         managed_object = self._data_session.query(object_type).filter( | ||||
|             object_type.unique_identifier == unique_identifier | ||||
|         ).one() | ||||
|  | ||||
|         # Determine if the request should be carried out under the object's | ||||
|         # operation policy. If not, feign ignorance of the object. | ||||
|         is_allowed = self._is_allowed_by_operation_policy( | ||||
|             managed_object.operation_policy_name, | ||||
|             self._client_identity, | ||||
|             managed_object._owner, | ||||
|             managed_object._object_type, | ||||
|             enums.Operation.GET_ATTRIBUTES | ||||
|         ) | ||||
|         if not is_allowed: | ||||
|             raise exceptions.ItemNotFound( | ||||
|                 "Could not locate object: {0}".format(unique_identifier) | ||||
|             ) | ||||
|  | ||||
|         attrs = self._get_attributes_from_managed_object( | ||||
|             managed_object, | ||||
|             payload.attribute_names | ||||
|         ) | ||||
|  | ||||
|         response_payload = get_attributes.GetAttributesResponsePayload( | ||||
|             unique_identifier=unique_identifier, | ||||
|             attributes=attrs | ||||
|         ) | ||||
|  | ||||
|         return response_payload | ||||
|  | ||||
|     @_kmip_version_supported('1.0') | ||||
|     def _process_activate(self, payload): | ||||
|         self._logger.info("Processing operation: Activate") | ||||
| @@ -1308,6 +1495,7 @@ class KmipEngine(object): | ||||
|                 contents.Operation(enums.Operation.CREATE_KEY_PAIR), | ||||
|                 contents.Operation(enums.Operation.REGISTER), | ||||
|                 contents.Operation(enums.Operation.GET), | ||||
|                 contents.Operation(enums.Operation.GET_ATTRIBUTES), | ||||
|                 contents.Operation(enums.Operation.DESTROY), | ||||
|                 contents.Operation(enums.Operation.QUERY) | ||||
|             ]) | ||||
|   | ||||
| @@ -1146,3 +1146,12 @@ class AttributePolicy(object): | ||||
|         # TODO (peterhamilton) Handle multivalue swap between certificate types | ||||
|         rule_set = self._attribute_rule_sets.get(attribute) | ||||
|         return rule_set.multiple_instances_permitted | ||||
|  | ||||
|     def get_all_attribute_names(self): | ||||
|         """ | ||||
|         Get a list of all supported attribute names. | ||||
|  | ||||
|         Returns: | ||||
|             list: A list of string attribute names. | ||||
|         """ | ||||
|         return self._attribute_rule_sets.keys() | ||||
|   | ||||
| @@ -46,7 +46,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|             b'\x69\x6F\x6E\x00\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x09' | ||||
|             b'\x78\x2D\x50\x75\x72\x70\x6F\x73\x65\x00\x00\x00\x00\x00\x00\x00' | ||||
|         ) | ||||
|         self.encoding_sans_uid = utils.BytearrayStream( | ||||
|         self.encoding_sans_unique_identifier = utils.BytearrayStream( | ||||
|             b'\x42\x00\x79\x01\x00\x00\x00\x78\x42\x00\x0A\x07\x00\x00\x00\x0C' | ||||
|             b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00' | ||||
|             b'\x42\x00\x0A\x07\x00\x00\x00\x20\x41\x70\x70\x6C\x69\x63\x61\x74' | ||||
| @@ -66,7 +66,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|             b'\x42\x00\x79\x01\x00\x00\x00\x00' | ||||
|         ) | ||||
|  | ||||
|         self.uid = '1703250b-4d40-4de2-93a0-c494a1d4ae40' | ||||
|         self.unique_identifier = '1703250b-4d40-4de2-93a0-c494a1d4ae40' | ||||
|         self.attribute_names = [ | ||||
|             'Object Group', | ||||
|             'Application Specific Information', | ||||
| @@ -90,41 +90,41 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         valid value. | ||||
|         """ | ||||
|         get_attributes.GetAttributesRequestPayload( | ||||
|             'test-uid', | ||||
|             'test-unique-identifier', | ||||
|             ['test-attribute-name-1', 'test-attribute-name-2'] | ||||
|         ) | ||||
|  | ||||
|     def test_uid(self): | ||||
|     def test_unique_identifier(self): | ||||
|         """ | ||||
|         Test that the uid attribute of a GetAttributes request payload can | ||||
|         be properly set and retrieved. | ||||
|         Test that the unique_identifier attribute of a GetAttributes request | ||||
|         payload can be properly set and retrieved. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload() | ||||
|  | ||||
|         self.assertIsNone(payload.uid) | ||||
|         self.assertIsNone(payload._uid) | ||||
|         self.assertIsNone(payload.unique_identifier) | ||||
|         self.assertIsNone(payload._unique_identifier) | ||||
|  | ||||
|         payload.uid = 'test-uid' | ||||
|         payload.unique_identifier = 'test-unique-identifier' | ||||
|  | ||||
|         self.assertEqual('test-uid', payload.uid) | ||||
|         self.assertEqual('test-unique-identifier', payload.unique_identifier) | ||||
|         self.assertEqual( | ||||
|             primitives.TextString( | ||||
|                 value='test-uid', | ||||
|                 value='test-unique-identifier', | ||||
|                 tag=enums.Tags.UNIQUE_IDENTIFIER | ||||
|             ), | ||||
|             payload._uid | ||||
|             payload._unique_identifier | ||||
|         ) | ||||
|  | ||||
|     def test_uid_with_invalid_value(self): | ||||
|     def test_unique_identifier_with_invalid_value(self): | ||||
|         """ | ||||
|         Test that a TypeError is raised when an invalid ID is used to set | ||||
|         the uid attribute of a GetAttributes request payload. | ||||
|         the unique_identifier attribute of a GetAttributes request payload. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload() | ||||
|         args = (payload, 'uid', 0) | ||||
|         args = (payload, 'unique_identifier', 0) | ||||
|         self.assertRaisesRegexp( | ||||
|             TypeError, | ||||
|             "uid must be a string", | ||||
|             "unique identifier must be a string", | ||||
|             setattr, | ||||
|             *args | ||||
|         ) | ||||
| @@ -239,18 +239,18 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload() | ||||
|  | ||||
|         self.assertEqual(None, payload._uid) | ||||
|         self.assertEqual(None, payload._unique_identifier) | ||||
|         self.assertEqual(list(), payload._attribute_names) | ||||
|  | ||||
|         payload.read(self.full_encoding) | ||||
|  | ||||
|         self.assertEqual(self.uid, payload.uid) | ||||
|         self.assertEqual(self.unique_identifier, payload.unique_identifier) | ||||
|         self.assertEqual( | ||||
|             primitives.TextString( | ||||
|                 value=self.uid, | ||||
|                 value=self.unique_identifier, | ||||
|                 tag=enums.Tags.UNIQUE_IDENTIFIER | ||||
|             ), | ||||
|             payload._uid | ||||
|             payload._unique_identifier | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             set(self.attribute_names), | ||||
| @@ -265,20 +265,20 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|                 payload._attribute_names | ||||
|             ) | ||||
|  | ||||
|     def test_read_with_no_uid(self): | ||||
|     def test_read_with_no_unique_identifier(self): | ||||
|         """ | ||||
|         Test that a GetAttributes request payload with no ID can be read | ||||
|         from a data stream. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload() | ||||
|  | ||||
|         self.assertEqual(None, payload._uid) | ||||
|         self.assertEqual(None, payload._unique_identifier) | ||||
|         self.assertEqual(list(), payload._attribute_names) | ||||
|  | ||||
|         payload.read(self.encoding_sans_uid) | ||||
|         payload.read(self.encoding_sans_unique_identifier) | ||||
|  | ||||
|         self.assertEqual(None, payload.uid) | ||||
|         self.assertEqual(None, payload._uid) | ||||
|         self.assertEqual(None, payload.unique_identifier) | ||||
|         self.assertEqual(None, payload._unique_identifier) | ||||
|         self.assertEqual( | ||||
|             set(self.attribute_names), | ||||
|             set(payload.attribute_names) | ||||
| @@ -299,18 +299,18 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload() | ||||
|  | ||||
|         self.assertEqual(None, payload._uid) | ||||
|         self.assertEqual(None, payload._unique_identifier) | ||||
|         self.assertEqual(list(), payload._attribute_names) | ||||
|  | ||||
|         payload.read(self.encoding_sans_attribute_names) | ||||
|  | ||||
|         self.assertEqual(self.uid, payload.uid) | ||||
|         self.assertEqual(self.unique_identifier, payload.unique_identifier) | ||||
|         self.assertEqual( | ||||
|             primitives.TextString( | ||||
|                 value=self.uid, | ||||
|                 value=self.unique_identifier, | ||||
|                 tag=enums.Tags.UNIQUE_IDENTIFIER | ||||
|             ), | ||||
|             payload._uid | ||||
|             payload._unique_identifier | ||||
|         ) | ||||
|         self.assertEqual(list(), payload.attribute_names) | ||||
|         self.assertEqual(list(), payload._attribute_names) | ||||
| @@ -322,13 +322,13 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload() | ||||
|  | ||||
|         self.assertEqual(None, payload._uid) | ||||
|         self.assertEqual(None, payload._unique_identifier) | ||||
|         self.assertEqual(list(), payload._attribute_names) | ||||
|  | ||||
|         payload.read(self.empty_encoding) | ||||
|  | ||||
|         self.assertEqual(None, payload.uid) | ||||
|         self.assertEqual(None, payload._uid) | ||||
|         self.assertEqual(None, payload.unique_identifier) | ||||
|         self.assertEqual(None, payload._unique_identifier) | ||||
|         self.assertEqual(list(), payload.attribute_names) | ||||
|         self.assertEqual(list(), payload._attribute_names) | ||||
|  | ||||
| @@ -338,7 +338,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         stream. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         stream = utils.BytearrayStream() | ||||
| @@ -347,7 +347,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         self.assertEqual(len(self.full_encoding), len(stream)) | ||||
|         self.assertEqual(str(self.full_encoding), str(stream)) | ||||
|  | ||||
|     def test_write_with_no_uid(self): | ||||
|     def test_write_with_no_unique_identifier(self): | ||||
|         """ | ||||
|         Test that a GetAttributes request payload with no ID can be written | ||||
|         to a data stream. | ||||
| @@ -359,8 +359,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         stream = utils.BytearrayStream() | ||||
|         payload.write(stream) | ||||
|  | ||||
|         self.assertEqual(len(self.encoding_sans_uid), len(stream)) | ||||
|         self.assertEqual(str(self.encoding_sans_uid), str(stream)) | ||||
|         self.assertEqual( | ||||
|             len(self.encoding_sans_unique_identifier), | ||||
|             len(stream) | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             str(self.encoding_sans_unique_identifier), | ||||
|             str(stream) | ||||
|         ) | ||||
|  | ||||
|     def test_write_with_no_attribute_names(self): | ||||
|         """ | ||||
| @@ -368,7 +374,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         can be written to a data stream. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             None | ||||
|         ) | ||||
|         stream = utils.BytearrayStream() | ||||
| @@ -394,21 +400,23 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         Test that repr can be applied to a GetAttributes request payload. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         uid = "uid={0}".format(payload.uid) | ||||
|         unique_identifier = "unique_identifier={0}".format( | ||||
|             payload.unique_identifier | ||||
|         ) | ||||
|         attribute_names = "attribute_names={0}".format( | ||||
|             payload.attribute_names | ||||
|         ) | ||||
|         expected = "GetAttributesRequestPayload({0}, {1})".format( | ||||
|             uid, | ||||
|             unique_identifier, | ||||
|             attribute_names | ||||
|         ) | ||||
|         observed = repr(payload) | ||||
|         self.assertEqual(expected, observed) | ||||
|  | ||||
|     def test_repr_with_no_uid(self): | ||||
|     def test_repr_with_no_unique_identifier(self): | ||||
|         """ | ||||
|         Test that repr can be applied to a GetAttributes request payload with | ||||
|         no ID. | ||||
| @@ -417,12 +425,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|             None, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         uid = "uid={0}".format(payload.uid) | ||||
|         unique_identifier = "unique_identifier={0}".format( | ||||
|             payload.unique_identifier | ||||
|         ) | ||||
|         attribute_names = "attribute_names={0}".format( | ||||
|             payload.attribute_names | ||||
|         ) | ||||
|         expected = "GetAttributesRequestPayload({0}, {1})".format( | ||||
|             uid, | ||||
|             unique_identifier, | ||||
|             attribute_names | ||||
|         ) | ||||
|         observed = repr(payload) | ||||
| @@ -434,15 +444,17 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         no attribute names. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             None | ||||
|         ) | ||||
|         uid = "uid={0}".format(payload.uid) | ||||
|         unique_identifier = "unique_identifier={0}".format( | ||||
|             payload.unique_identifier | ||||
|         ) | ||||
|         attribute_names = "attribute_names={0}".format( | ||||
|             payload.attribute_names | ||||
|         ) | ||||
|         expected = "GetAttributesRequestPayload({0}, {1})".format( | ||||
|             uid, | ||||
|             unique_identifier, | ||||
|             attribute_names | ||||
|         ) | ||||
|         observed = repr(payload) | ||||
| @@ -457,12 +469,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|             None, | ||||
|             None | ||||
|         ) | ||||
|         uid = "uid={0}".format(payload.uid) | ||||
|         unique_identifier = "unique_identifier={0}".format( | ||||
|             payload.unique_identifier | ||||
|         ) | ||||
|         attribute_names = "attribute_names={0}".format( | ||||
|             payload.attribute_names | ||||
|         ) | ||||
|         expected = "GetAttributesRequestPayload({0}, {1})".format( | ||||
|             uid, | ||||
|             unique_identifier, | ||||
|             attribute_names | ||||
|         ) | ||||
|         observed = repr(payload) | ||||
| @@ -473,11 +487,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         Test that str can be applied to a GetAttributes request payload. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         expected = str({ | ||||
|             'uid': self.uid, | ||||
|             'unique_identifier': self.unique_identifier, | ||||
|             'attribute_names': self.attribute_names | ||||
|         }) | ||||
|         observed = str(payload) | ||||
| @@ -493,7 +507,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         expected = str({ | ||||
|             'uid': None, | ||||
|             'unique_identifier': None, | ||||
|             'attribute_names': self.attribute_names | ||||
|         }) | ||||
|         observed = str(payload) | ||||
| @@ -505,11 +519,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         no attribute names. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             None | ||||
|         ) | ||||
|         expected = str({ | ||||
|             'uid': self.uid, | ||||
|             'unique_identifier': self.unique_identifier, | ||||
|             'attribute_names': list() | ||||
|         }) | ||||
|         observed = str(payload) | ||||
| @@ -525,7 +539,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|             None | ||||
|         ) | ||||
|         expected = str({ | ||||
|             'uid': None, | ||||
|             'unique_identifier': None, | ||||
|             'attribute_names': list() | ||||
|         }) | ||||
|         observed = str(payload) | ||||
| @@ -537,11 +551,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         GetAttributes request payloads with the same data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|  | ||||
| @@ -555,25 +569,25 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         but with different attribute name orderings. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         self.attribute_names.reverse() | ||||
|         b = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|  | ||||
|         self.assertTrue(a == b) | ||||
|         self.assertTrue(b == a) | ||||
|  | ||||
|     def test_equal_on_not_equal_uid(self): | ||||
|     def test_equal_on_not_equal_unique_identifier(self): | ||||
|         """ | ||||
|         Test that the equality operator returns False when comparing two | ||||
|         GetAttributes request payloads with different IDs. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesRequestPayload( | ||||
| @@ -590,11 +604,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         GetAttributes request payloads with different attribute names. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             None | ||||
|         ) | ||||
|  | ||||
| @@ -608,7 +622,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         payload. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         b = "invalid" | ||||
| @@ -622,24 +636,24 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         two GetAttributes request payloads with the same internal data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|  | ||||
|         self.assertFalse(a != b) | ||||
|         self.assertFalse(b != a) | ||||
|  | ||||
|     def test_not_equal_on_not_equal_uid(self): | ||||
|     def test_not_equal_on_not_equal_unique_identifier(self): | ||||
|         """ | ||||
|         Test that the inequality operator returns True when comparing two | ||||
|         GetAttributes request payloads with different IDs. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesRequestPayload( | ||||
| @@ -656,11 +670,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         GetAttributes request payloads with different attribute names. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             None | ||||
|         ) | ||||
|  | ||||
| @@ -674,7 +688,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase): | ||||
|         payload. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesRequestPayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attribute_names | ||||
|         ) | ||||
|         b = "invalid" | ||||
| @@ -715,7 +729,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|             b'\x42\x00\x0B\x07\x00\x00\x00\x0D\x64\x65\x6D\x6F\x6E\x73\x74\x72' | ||||
|             b'\x61\x74\x69\x6F\x6E\x00\x00\x00' | ||||
|         ) | ||||
|         self.encoding_sans_uid = utils.BytearrayStream( | ||||
|         self.encoding_sans_unique_identifier = utils.BytearrayStream( | ||||
|             b'\x42\x00\x7C\x01\x00\x00\x01\x00\x42\x00\x08\x01\x00\x00\x00\x28' | ||||
|             b'\x42\x00\x0A\x07\x00\x00\x00\x0C\x4F\x62\x6A\x65\x63\x74\x20\x47' | ||||
|             b'\x72\x6F\x75\x70\x00\x00\x00\x00\x42\x00\x0B\x07\x00\x00\x00\x06' | ||||
| @@ -741,7 +755,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|             b'\x61\x65\x34\x30\x00\x00\x00\x00' | ||||
|         ) | ||||
|  | ||||
|         self.uid = '1703250b-4d40-4de2-93a0-c494a1d4ae40' | ||||
|         self.unique_identifier = '1703250b-4d40-4de2-93a0-c494a1d4ae40' | ||||
|         self.attributes = [ | ||||
|             objects.Attribute( | ||||
|                 attribute_name=objects.Attribute.AttributeName( | ||||
| @@ -785,41 +799,41 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         valid value. | ||||
|         """ | ||||
|         get_attributes.GetAttributesResponsePayload( | ||||
|             'test-uid', | ||||
|             'test-unique-identifier', | ||||
|             [objects.Attribute(), objects.Attribute()] | ||||
|         ) | ||||
|  | ||||
|     def test_uid(self): | ||||
|     def test_unique_identifier(self): | ||||
|         """ | ||||
|         Test that the uid attribute of a GetAttributes response payload can | ||||
|         be properly set and retrieved. | ||||
|         Test that the unique_identifier attribute of a GetAttributes response | ||||
|         payload can be properly set and retrieved. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesResponsePayload() | ||||
|  | ||||
|         self.assertIsNone(payload.uid) | ||||
|         self.assertIsNone(payload._uid) | ||||
|         self.assertIsNone(payload.unique_identifier) | ||||
|         self.assertIsNone(payload._unique_identifier) | ||||
|  | ||||
|         payload.uid = 'test-uid' | ||||
|         payload.unique_identifier = 'test-unique-identifier' | ||||
|  | ||||
|         self.assertEqual('test-uid', payload.uid) | ||||
|         self.assertEqual('test-unique-identifier', payload.unique_identifier) | ||||
|         self.assertEqual( | ||||
|             primitives.TextString( | ||||
|                 value='test-uid', | ||||
|                 value='test-unique-identifier', | ||||
|                 tag=enums.Tags.UNIQUE_IDENTIFIER | ||||
|             ), | ||||
|             payload._uid | ||||
|             payload._unique_identifier | ||||
|         ) | ||||
|  | ||||
|     def test_uid_with_invalid_value(self): | ||||
|     def test_unique_identifier_with_invalid_value(self): | ||||
|         """ | ||||
|         Test that a TypeError is raised when an invalid ID is used to set | ||||
|         the uid attribute of a GetAttributes response payload. | ||||
|         the unique_identifier attribute of a GetAttributes response payload. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesResponsePayload() | ||||
|         args = (payload, 'uid', 0) | ||||
|         args = (payload, 'unique_identifier', 0) | ||||
|         self.assertRaisesRegexp( | ||||
|             TypeError, | ||||
|             "uid must be a string", | ||||
|             "unique identifier must be a string", | ||||
|             setattr, | ||||
|             *args | ||||
|         ) | ||||
| @@ -885,18 +899,18 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesResponsePayload() | ||||
|  | ||||
|         self.assertEqual(None, payload._uid) | ||||
|         self.assertEqual(None, payload._unique_identifier) | ||||
|         self.assertEqual(list(), payload._attributes) | ||||
|  | ||||
|         payload.read(self.full_encoding) | ||||
|  | ||||
|         self.assertEqual(self.uid, payload.uid) | ||||
|         self.assertEqual(self.unique_identifier, payload.unique_identifier) | ||||
|         self.assertEqual( | ||||
|             primitives.TextString( | ||||
|                 value=self.uid, | ||||
|                 value=self.unique_identifier, | ||||
|                 tag=enums.Tags.UNIQUE_IDENTIFIER | ||||
|             ), | ||||
|             payload._uid | ||||
|             payload._unique_identifier | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             len(self.attributes), | ||||
| @@ -908,20 +922,20 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|                 payload._attributes | ||||
|             ) | ||||
|  | ||||
|     def test_read_with_no_uid(self): | ||||
|     def test_read_with_no_unique_identifier(self): | ||||
|         """ | ||||
|         Test that an InvalidKmipEncoding error gets raised when attempting to | ||||
|         read a GetAttributes response encoding with no ID data. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesResponsePayload() | ||||
|  | ||||
|         self.assertEqual(None, payload._uid) | ||||
|         self.assertEqual(None, payload._unique_identifier) | ||||
|         self.assertEqual(list(), payload._attributes) | ||||
|  | ||||
|         args = (self.encoding_sans_uid, ) | ||||
|         args = (self.encoding_sans_unique_identifier, ) | ||||
|         self.assertRaisesRegexp( | ||||
|             exceptions.InvalidKmipEncoding, | ||||
|             "expected GetAttributes response uid not found", | ||||
|             "expected GetAttributes response unique identifier not found", | ||||
|             payload.read, | ||||
|             *args | ||||
|         ) | ||||
| @@ -933,18 +947,18 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesResponsePayload() | ||||
|  | ||||
|         self.assertEqual(None, payload._uid) | ||||
|         self.assertEqual(None, payload._unique_identifier) | ||||
|         self.assertEqual(list(), payload._attributes) | ||||
|  | ||||
|         payload.read(self.encoding_sans_attributes) | ||||
|  | ||||
|         self.assertEqual(self.uid, payload.uid) | ||||
|         self.assertEqual(self.unique_identifier, payload.unique_identifier) | ||||
|         self.assertEqual( | ||||
|             primitives.TextString( | ||||
|                 value=self.uid, | ||||
|                 value=self.unique_identifier, | ||||
|                 tag=enums.Tags.UNIQUE_IDENTIFIER | ||||
|             ), | ||||
|             payload._uid | ||||
|             payload._unique_identifier | ||||
|         ) | ||||
|         self.assertEqual(list(), payload.attributes) | ||||
|         self.assertEqual(list(), payload._attributes) | ||||
| @@ -955,7 +969,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         stream. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         stream = utils.BytearrayStream() | ||||
| @@ -964,7 +978,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         self.assertEqual(len(self.full_encoding), len(stream)) | ||||
|         self.assertEqual(str(self.full_encoding), str(stream)) | ||||
|  | ||||
|     def test_write_with_no_uid(self): | ||||
|     def test_write_with_no_unique_identifier(self): | ||||
|         """ | ||||
|         Test that a GetAttributes request payload with no ID can be written | ||||
|         to a data stream. | ||||
| @@ -978,7 +992,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         args = (stream, ) | ||||
|         self.assertRaisesRegexp( | ||||
|             exceptions.InvalidField, | ||||
|             "The GetAttributes response uid is required.", | ||||
|             "The GetAttributes response unique identifier is required.", | ||||
|             payload.write, | ||||
|             *args | ||||
|         ) | ||||
| @@ -989,7 +1003,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         data can be written to a data stream. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             None | ||||
|         ) | ||||
|         stream = utils.BytearrayStream() | ||||
| @@ -1003,15 +1017,17 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         Test that repr can be applied to a GetAttributes response payload. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         uid = "uid={0}".format(payload.uid) | ||||
|         unique_identifier = "unique_identifier={0}".format( | ||||
|             payload.unique_identifier | ||||
|         ) | ||||
|         payload_attributes = "attributes={0}".format( | ||||
|             payload.attributes | ||||
|         ) | ||||
|         expected = "GetAttributesResponsePayload({0}, {1})".format( | ||||
|             uid, | ||||
|             unique_identifier, | ||||
|             payload_attributes | ||||
|         ) | ||||
|         observed = repr(payload) | ||||
| @@ -1022,11 +1038,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         Test that str can be applied to a GetAttributes response payload. | ||||
|         """ | ||||
|         payload = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         expected = str({ | ||||
|             'uid': self.uid, | ||||
|             'unique_identifier': self.unique_identifier, | ||||
|             'attributes': self.attributes | ||||
|         }) | ||||
|         observed = str(payload) | ||||
| @@ -1038,24 +1054,24 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         GetAttributes response payloads with the same data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|  | ||||
|         self.assertTrue(a == b) | ||||
|         self.assertTrue(b == a) | ||||
|  | ||||
|     def test_equal_on_not_equal_uid(self): | ||||
|     def test_equal_on_not_equal_unique_identifier(self): | ||||
|         """ | ||||
|         Test that the equality operator returns False when comparing two | ||||
|         GetAttributes response payloads with different data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesResponsePayload( | ||||
| @@ -1072,13 +1088,13 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         GetAttributes response payloads with different data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         reversed_attributes = copy.deepcopy(self.attributes) | ||||
|         reversed_attributes.reverse() | ||||
|         b = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             reversed_attributes | ||||
|         ) | ||||
|  | ||||
| @@ -1091,11 +1107,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         GetAttributes response payloads with different data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             list() | ||||
|         ) | ||||
|  | ||||
| @@ -1108,11 +1124,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         GetAttributes response payloads with different data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             None | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|  | ||||
| @@ -1126,7 +1142,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         payload. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         b = 'invalid' | ||||
| @@ -1140,24 +1156,24 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         two GetAttributes response payloads with the same internal data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|  | ||||
|         self.assertFalse(a != b) | ||||
|         self.assertFalse(b != a) | ||||
|  | ||||
|     def test_not_equal_on_not_equal_uid(self): | ||||
|     def test_not_equal_on_not_equal_unique_identifier(self): | ||||
|         """ | ||||
|         Test that the inequality operator returns True when comparing two | ||||
|         GetAttributes request payloads with different data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesResponsePayload( | ||||
| @@ -1174,13 +1190,13 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         GetAttributes response payloads with different data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         reversed_attributes = copy.deepcopy(self.attributes) | ||||
|         reversed_attributes.reverse() | ||||
|         b = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             reversed_attributes | ||||
|         ) | ||||
|  | ||||
| @@ -1193,11 +1209,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         GetAttributes response payloads with different data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             list() | ||||
|         ) | ||||
|  | ||||
| @@ -1210,11 +1226,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         GetAttributes response payloads with different data. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             None | ||||
|         ) | ||||
|         b = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|  | ||||
| @@ -1228,7 +1244,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase): | ||||
|         payload. | ||||
|         """ | ||||
|         a = get_attributes.GetAttributesResponsePayload( | ||||
|             self.uid, | ||||
|             self.unique_identifier, | ||||
|             self.attributes | ||||
|         ) | ||||
|         b = "invalid" | ||||
|   | ||||
| @@ -43,6 +43,7 @@ from kmip.core.messages.payloads import create_key_pair | ||||
| from kmip.core.messages.payloads import destroy | ||||
| from kmip.core.messages.payloads import discover_versions | ||||
| from kmip.core.messages.payloads import get | ||||
| from kmip.core.messages.payloads import get_attributes | ||||
| from kmip.core.messages.payloads import query | ||||
| from kmip.core.messages.payloads import register | ||||
|  | ||||
| @@ -859,6 +860,7 @@ class TestKmipEngine(testtools.TestCase): | ||||
|         e._process_create_key_pair = mock.MagicMock() | ||||
|         e._process_register = mock.MagicMock() | ||||
|         e._process_get = mock.MagicMock() | ||||
|         e._process_get_attributes = mock.MagicMock() | ||||
|         e._process_destroy = mock.MagicMock() | ||||
|         e._process_query = mock.MagicMock() | ||||
|         e._process_discover_versions = mock.MagicMock() | ||||
| @@ -867,6 +869,7 @@ class TestKmipEngine(testtools.TestCase): | ||||
|         e._process_operation(enums.Operation.CREATE_KEY_PAIR, None) | ||||
|         e._process_operation(enums.Operation.REGISTER, None) | ||||
|         e._process_operation(enums.Operation.GET, None) | ||||
|         e._process_operation(enums.Operation.GET_ATTRIBUTES, None) | ||||
|         e._process_operation(enums.Operation.DESTROY, None) | ||||
|         e._process_operation(enums.Operation.QUERY, None) | ||||
|         e._process_operation(enums.Operation.DISCOVER_VERSIONS, None) | ||||
| @@ -875,6 +878,7 @@ class TestKmipEngine(testtools.TestCase): | ||||
|         e._process_create_key_pair.assert_called_with(None) | ||||
|         e._process_register.assert_called_with(None) | ||||
|         e._process_get.assert_called_with(None) | ||||
|         e._process_get_attributes.assert_called_with(None) | ||||
|         e._process_destroy.assert_called_with(None) | ||||
|         e._process_query.assert_called_with(None) | ||||
|         e._process_discover_versions.assert_called_with(None) | ||||
| @@ -945,7 +949,6 @@ class TestKmipEngine(testtools.TestCase): | ||||
|         e._logger.warning.assert_called_once_with( | ||||
|             "Could not identify object type for object: 1" | ||||
|         ) | ||||
|         self.assertTrue(e._logger.exception.called) | ||||
|  | ||||
|     def test_get_object_type_multiple_objects(self): | ||||
|         """ | ||||
| @@ -1352,6 +1355,345 @@ class TestKmipEngine(testtools.TestCase): | ||||
|             *args | ||||
|         ) | ||||
|  | ||||
|     def test_get_attributes_from_managed_object(self): | ||||
|         """ | ||||
|         Test that multiple attributes can be retrieved from a given managed | ||||
|         object. | ||||
|         """ | ||||
|         e = engine.KmipEngine() | ||||
|         e._data_store = self.engine | ||||
|         e._data_store_session_factory = self.session_factory | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|         e._logger = mock.MagicMock() | ||||
|  | ||||
|         symmetric_key = pie_objects.SymmetricKey( | ||||
|             enums.CryptographicAlgorithm.AES, | ||||
|             0, | ||||
|             b'', | ||||
|             masks=[enums.CryptographicUsageMask.ENCRYPT, | ||||
|                    enums.CryptographicUsageMask.DECRYPT] | ||||
|         ) | ||||
|         symmetric_key.names = ['Name 1', 'Name 2'] | ||||
|  | ||||
|         e._data_session.add(symmetric_key) | ||||
|         e._data_session.commit() | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|  | ||||
|         result = e._get_attributes_from_managed_object( | ||||
|             symmetric_key, | ||||
|             ['Unique Identifier', | ||||
|              'Name', | ||||
|              'Cryptographic Algorithm', | ||||
|              'Cryptographic Length', | ||||
|              'Cryptographic Usage Mask', | ||||
|              'invalid'] | ||||
|         ) | ||||
|         attribute_factory = factory.AttributeFactory() | ||||
|  | ||||
|         self.assertEqual(6, len(result)) | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.UNIQUE_IDENTIFIER, | ||||
|             '1' | ||||
|         ) | ||||
|         self.assertIn(attribute, result) | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, | ||||
|             enums.CryptographicAlgorithm.AES | ||||
|         ) | ||||
|         self.assertIn(attribute, result) | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.CRYPTOGRAPHIC_LENGTH, | ||||
|             0 | ||||
|         ) | ||||
|         self.assertIn(attribute, result) | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, | ||||
|             [enums.CryptographicUsageMask.ENCRYPT, | ||||
|              enums.CryptographicUsageMask.DECRYPT] | ||||
|         ) | ||||
|         self.assertIn(attribute, result) | ||||
|  | ||||
|     def test_get_attribute_from_managed_object(self): | ||||
|         """ | ||||
|         Test that an attribute can be retrieved from a given managed object. | ||||
|         """ | ||||
|         e = engine.KmipEngine() | ||||
|         e._data_store = self.engine | ||||
|         e._data_store_session_factory = self.session_factory | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|         e._logger = mock.MagicMock() | ||||
|  | ||||
|         symmetric_key = pie_objects.SymmetricKey( | ||||
|             enums.CryptographicAlgorithm.AES, | ||||
|             0, | ||||
|             b'', | ||||
|             masks=[enums.CryptographicUsageMask.ENCRYPT, | ||||
|                    enums.CryptographicUsageMask.DECRYPT] | ||||
|         ) | ||||
|         certificate = pie_objects.X509Certificate( | ||||
|             b'' | ||||
|         ) | ||||
|         opaque_object = pie_objects.OpaqueObject( | ||||
|             b'', | ||||
|             enums.OpaqueDataType.NONE | ||||
|         ) | ||||
|  | ||||
|         e._data_session.add(symmetric_key) | ||||
|         e._data_session.add(certificate) | ||||
|         e._data_session.add(opaque_object) | ||||
|         e._data_session.commit() | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Unique Identifier' | ||||
|         ) | ||||
|         self.assertEqual('1', result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Name' | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             [attributes.Name( | ||||
|                 attributes.Name.NameValue('Symmetric Key'), | ||||
|                 attributes.Name.NameType( | ||||
|                     enums.NameType.UNINTERPRETED_TEXT_STRING | ||||
|                 ) | ||||
|             )], | ||||
|             result | ||||
|         ) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Object Type' | ||||
|         ) | ||||
|         self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Cryptographic Algorithm' | ||||
|         ) | ||||
|         self.assertEqual(enums.CryptographicAlgorithm.AES, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Cryptographic Length' | ||||
|         ) | ||||
|         self.assertEqual(0, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             certificate, | ||||
|             'Cryptographic Parameters' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             certificate, | ||||
|             'Cryptographic Domain Parameters' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             certificate, | ||||
|             'Certificate Type' | ||||
|         ) | ||||
|         self.assertEqual(enums.CertificateTypeEnum.X_509, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             certificate, | ||||
|             'Certificate Length' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             certificate, | ||||
|             'X.509 Certificate Identifier' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             certificate, | ||||
|             'X.509 Certificate Subject' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             certificate, | ||||
|             'X.509 Certificate Issuer' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             certificate, | ||||
|             'Certificate Identifier' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             certificate, | ||||
|             'Certificate Subject' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             certificate, | ||||
|             'Certificate Issuer' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             certificate, | ||||
|             'Digital Signature Algorithm' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             opaque_object, | ||||
|             'Digest' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Operation Policy Name' | ||||
|         ) | ||||
|         self.assertEqual('default', result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Cryptographic Usage Mask' | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             [enums.CryptographicUsageMask.ENCRYPT, | ||||
|              enums.CryptographicUsageMask.DECRYPT], | ||||
|             result | ||||
|         ) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Lease Time' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Usage Limits' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'State' | ||||
|         ) | ||||
|         self.assertEqual(enums.State.PRE_ACTIVE, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Initial Date' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Activation Date' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Process Start Date' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Protect Stop Date' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Deactivation Date' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Destroy Date' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Compromise Occurrence Date' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Compromise Date' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Revocation Reason' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Archive Date' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Object Group' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Fresh' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Link' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Application Specific Information' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Contact Information' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'Last Change Date' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|         result = e._get_attribute_from_managed_object( | ||||
|             symmetric_key, | ||||
|             'invalid' | ||||
|         ) | ||||
|         self.assertEqual(None, result) | ||||
|  | ||||
|     def test_set_attributes_on_managed_object(self): | ||||
|         """ | ||||
|         Test that multiple attributes can be set on a given managed object. | ||||
| @@ -3254,6 +3596,178 @@ class TestKmipEngine(testtools.TestCase): | ||||
|             *args | ||||
|         ) | ||||
|  | ||||
|     def test_get_attributes(self): | ||||
|         """ | ||||
|         Test that a GetAttributes request can be processed correctly. | ||||
|         """ | ||||
|         e = engine.KmipEngine() | ||||
|         e._data_store = self.engine | ||||
|         e._data_store_session_factory = self.session_factory | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|         e._logger = mock.MagicMock() | ||||
|  | ||||
|         secret = pie_objects.SymmetricKey( | ||||
|             enums.CryptographicAlgorithm.AES, | ||||
|             0, | ||||
|             b'' | ||||
|         ) | ||||
|  | ||||
|         e._data_session.add(secret) | ||||
|         e._data_session.commit() | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|  | ||||
|         payload = get_attributes.GetAttributesRequestPayload( | ||||
|             unique_identifier='1', | ||||
|             attribute_names=['Object Type', 'Cryptographic Algorithm'] | ||||
|         ) | ||||
|  | ||||
|         response_payload = e._process_get_attributes(payload) | ||||
|         e._data_session.commit() | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|  | ||||
|         e._logger.info.assert_any_call( | ||||
|             "Processing operation: GetAttributes" | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             '1', | ||||
|             response_payload.unique_identifier | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             2, | ||||
|             len(response_payload.attributes) | ||||
|         ) | ||||
|  | ||||
|         attribute_factory = factory.AttributeFactory() | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.OBJECT_TYPE, | ||||
|             enums.ObjectType.SYMMETRIC_KEY | ||||
|         ) | ||||
|         self.assertIn(attribute, response_payload.attributes) | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, | ||||
|             enums.CryptographicAlgorithm.AES | ||||
|         ) | ||||
|         self.assertIn(attribute, response_payload.attributes) | ||||
|  | ||||
|     def test_get_attributes_with_no_arguments(self): | ||||
|         """ | ||||
|         Test that a GetAttributes request with no arguments can be processed | ||||
|         correctly. | ||||
|         """ | ||||
|         e = engine.KmipEngine() | ||||
|         e._data_store = self.engine | ||||
|         e._data_store_session_factory = self.session_factory | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|         e._logger = mock.MagicMock() | ||||
|  | ||||
|         secret = pie_objects.SymmetricKey( | ||||
|             enums.CryptographicAlgorithm.AES, | ||||
|             0, | ||||
|             b'' | ||||
|         ) | ||||
|  | ||||
|         e._data_session.add(secret) | ||||
|         e._data_session.commit() | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|         e._id_placeholder = '1' | ||||
|  | ||||
|         payload = get_attributes.GetAttributesRequestPayload() | ||||
|  | ||||
|         response_payload = e._process_get_attributes(payload) | ||||
|         e._data_session.commit() | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|  | ||||
|         e._logger.info.assert_any_call( | ||||
|             "Processing operation: GetAttributes" | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             '1', | ||||
|             response_payload.unique_identifier | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             8, | ||||
|             len(response_payload.attributes) | ||||
|         ) | ||||
|  | ||||
|         attribute_factory = factory.AttributeFactory() | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.OBJECT_TYPE, | ||||
|             enums.ObjectType.SYMMETRIC_KEY | ||||
|         ) | ||||
|         self.assertIn(attribute, response_payload.attributes) | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, | ||||
|             enums.CryptographicAlgorithm.AES | ||||
|         ) | ||||
|         self.assertIn(attribute, response_payload.attributes) | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.CRYPTOGRAPHIC_LENGTH, | ||||
|             0 | ||||
|         ) | ||||
|         self.assertIn(attribute, response_payload.attributes) | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.OPERATION_POLICY_NAME, | ||||
|             'default' | ||||
|         ) | ||||
|         self.assertIn(attribute, response_payload.attributes) | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, | ||||
|             [] | ||||
|         ) | ||||
|         self.assertIn(attribute, response_payload.attributes) | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.STATE, | ||||
|             enums.State.PRE_ACTIVE | ||||
|         ) | ||||
|         self.assertIn(attribute, response_payload.attributes) | ||||
|  | ||||
|         attribute = attribute_factory.create_attribute( | ||||
|             enums.AttributeType.UNIQUE_IDENTIFIER, | ||||
|             '1' | ||||
|         ) | ||||
|         self.assertIn(attribute, response_payload.attributes) | ||||
|  | ||||
|     def test_get_attributes_not_allowed_by_policy(self): | ||||
|         """ | ||||
|         Test that an unallowed request is handled correctly by GetAttributes. | ||||
|         """ | ||||
|         e = engine.KmipEngine() | ||||
|         e._data_store = self.engine | ||||
|         e._data_store_session_factory = self.session_factory | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|         e._logger = mock.MagicMock() | ||||
|         e._client_identity = 'test' | ||||
|  | ||||
|         obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) | ||||
|         obj_a._owner = 'admin' | ||||
|  | ||||
|         e._data_session.add(obj_a) | ||||
|         e._data_session.commit() | ||||
|         e._data_session = e._data_store_session_factory() | ||||
|  | ||||
|         id_a = str(obj_a.unique_identifier) | ||||
|         payload = get_attributes.GetAttributesRequestPayload( | ||||
|             unique_identifier=id_a | ||||
|         ) | ||||
|  | ||||
|         # Test by specifying the ID of the object whose attributes should | ||||
|         # be retrieved. | ||||
|         args = [payload] | ||||
|         self.assertRaisesRegex( | ||||
|             exceptions.ItemNotFound, | ||||
|             "Could not locate object: {0}".format(id_a), | ||||
|             e._process_get_attributes, | ||||
|             *args | ||||
|         ) | ||||
|  | ||||
|     def test_activate(self): | ||||
|         """ | ||||
|         Test that an Activate request can be processed correctly. | ||||
| @@ -3565,7 +4079,7 @@ class TestKmipEngine(testtools.TestCase): | ||||
|         e._logger.info.assert_called_once_with("Processing operation: Query") | ||||
|         self.assertIsInstance(result, query.QueryResponsePayload) | ||||
|         self.assertIsNotNone(result.operations) | ||||
|         self.assertEqual(6, len(result.operations)) | ||||
|         self.assertEqual(7, len(result.operations)) | ||||
|         self.assertEqual( | ||||
|             enums.Operation.CREATE, | ||||
|             result.operations[0].value | ||||
| @@ -3583,13 +4097,17 @@ class TestKmipEngine(testtools.TestCase): | ||||
|             result.operations[3].value | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             enums.Operation.DESTROY, | ||||
|             enums.Operation.GET_ATTRIBUTES, | ||||
|             result.operations[4].value | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             enums.Operation.QUERY, | ||||
|             enums.Operation.DESTROY, | ||||
|             result.operations[5].value | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             enums.Operation.QUERY, | ||||
|             result.operations[6].value | ||||
|         ) | ||||
|         self.assertEqual(list(), result.object_types) | ||||
|         self.assertIsNotNone(result.vendor_identification) | ||||
|         self.assertEqual( | ||||
| @@ -3608,7 +4126,7 @@ class TestKmipEngine(testtools.TestCase): | ||||
|  | ||||
|         e._logger.info.assert_called_once_with("Processing operation: Query") | ||||
|         self.assertIsNotNone(result.operations) | ||||
|         self.assertEqual(7, len(result.operations)) | ||||
|         self.assertEqual(8, len(result.operations)) | ||||
|         self.assertEqual( | ||||
|             enums.Operation.DISCOVER_VERSIONS, | ||||
|             result.operations[-1].value | ||||
|   | ||||
| @@ -112,3 +112,57 @@ class TestAttributePolicy(testtools.TestCase): | ||||
|  | ||||
|         result = rules.is_attribute_multivalued(attribute_b) | ||||
|         self.assertTrue(result) | ||||
|  | ||||
|     def test_get_all_attribute_names(self): | ||||
|         """ | ||||
|         Test that get_all_attribute_names returns a complete list of the | ||||
|         names of all spec-defined attributes. | ||||
|         """ | ||||
|         rules = policy.AttributePolicy(contents.ProtocolVersion.create(1, 0)) | ||||
|         attribute_names = [ | ||||
|             'Unique Identifier', | ||||
|             'Name', | ||||
|             'Object Type', | ||||
|             'Cryptographic Algorithm', | ||||
|             'Cryptographic Length', | ||||
|             'Cryptographic Parameters', | ||||
|             'Cryptographic Domain Parameters', | ||||
|             'Certificate Type', | ||||
|             'Certificate Length', | ||||
|             'X.509 Certificate Identifier', | ||||
|             'X.509 Certificate Subject', | ||||
|             'X.509 Certificate Issuer', | ||||
|             'Certificate Identifier', | ||||
|             'Certificate Subject', | ||||
|             'Certificate Issuer', | ||||
|             'Digital Signature Algorithm', | ||||
|             'Digest', | ||||
|             'Operation Policy Name', | ||||
|             'Cryptographic Usage Mask', | ||||
|             'Lease Time', | ||||
|             'Usage Limits', | ||||
|             'State', | ||||
|             'Initial Date', | ||||
|             'Activation Date', | ||||
|             'Process Start Date', | ||||
|             'Protect Stop Date', | ||||
|             'Deactivation Date', | ||||
|             'Destroy Date', | ||||
|             'Compromise Occurrence Date', | ||||
|             'Compromise Date', | ||||
|             'Revocation Reason', | ||||
|             'Archive Date', | ||||
|             'Object Group', | ||||
|             'Fresh', | ||||
|             'Link', | ||||
|             'Application Specific Information', | ||||
|             'Contact Information', | ||||
|             'Last Change Date', | ||||
|             'Custom Attribute' | ||||
|         ] | ||||
|  | ||||
|         result = rules.get_all_attribute_names() | ||||
|  | ||||
|         self.assertEqual(len(attribute_names), len(result)) | ||||
|         for attribute_name in attribute_names: | ||||
|             self.assertIn(attribute_name, result) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Peter Hamilton
					Peter Hamilton