diff --git a/kmip/core/enums.py b/kmip/core/enums.py index c481f65..f01b069 100644 --- a/kmip/core/enums.py +++ b/kmip/core/enums.py @@ -458,6 +458,15 @@ class KeyRoleType(Enum): PVKOTH = 0x00000015 +# 9.1.3.2.24 +class QueryFunction(Enum): + QUERY_OPERATIONS = 0x00000001 + QUERY_OBJECTS = 0x00000002 + QUERY_SERVER_INFORMATION = 0x00000003 + QUERY_APPLICATION_NAMESPACES = 0x00000004 + QUERY_EXTENSION_LIST = 0x00000005 + QUERY_EXTENSION_MAP = 0x00000006 + # 9.1.3.2.27 class Operation(Enum): CREATE = 0x00000001 diff --git a/kmip/core/factories/payloads/request.py b/kmip/core/factories/payloads/request.py index adc9958..41e5b6c 100644 --- a/kmip/core/factories/payloads/request.py +++ b/kmip/core/factories/payloads/request.py @@ -21,6 +21,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -48,5 +49,8 @@ class RequestPayloadFactory(PayloadFactory): def _create_destroy_payload(self): return destroy.DestroyRequestPayload() + def _create_query_payload(self): + return query.QueryRequestPayload() + def _create_discover_versions_payload(self): return discover_versions.DiscoverVersionsRequestPayload() diff --git a/kmip/core/factories/payloads/response.py b/kmip/core/factories/payloads/response.py index 69a7c31..88e0a49 100644 --- a/kmip/core/factories/payloads/response.py +++ b/kmip/core/factories/payloads/response.py @@ -21,6 +21,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -48,5 +49,8 @@ class ResponsePayloadFactory(PayloadFactory): def _create_destroy_payload(self): return destroy.DestroyResponsePayload() + def _create_query_payload(self): + return query.QueryResponsePayload() + def _create_discover_versions_payload(self): return discover_versions.DiscoverVersionsResponsePayload() diff --git a/kmip/core/messages/payloads/query.py b/kmip/core/messages/payloads/query.py new file mode 100644 index 0000000..248111d --- /dev/null +++ b/kmip/core/messages/payloads/query.py @@ -0,0 +1,339 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from six.moves import xrange + +from kmip.core.attributes import ApplicationNamespace +from kmip.core.attributes import ObjectType + +from kmip.core.enums import Tags +from kmip.core.messages.contents import Operation + +from kmip.core.misc import QueryFunction +from kmip.core.misc import ServerInformation +from kmip.core.misc import VendorIdentification + +from kmip.core.objects import ExtensionInformation +from kmip.core.primitives import Struct +from kmip.core.utils import BytearrayStream + + +class QueryRequestPayload(Struct): + """ + A request payload for the Query operation. + + The payload contains a list of query functions that the KMIP server should + respond to. See Section 4.25 of the KMIP 1.1 specification for more + information. + + Attributes: + query_functions: A list of QueryFunction enumerations. + """ + def __init__(self, query_functions=None): + """ + Construct a QueryRequestPayload object. + + Args: + query_functions (list): A list of QueryFunction enumerations. + """ + super(QueryRequestPayload, self).__init__(Tags.REQUEST_PAYLOAD) + + if query_functions is None: + self.query_functions = list() + else: + self.query_functions = query_functions + + self.validate() + + def read(self, istream): + """ + Read the data encoding the QueryRequestPayload object and decode it + into its constituent parts. + + Args: + istream (Stream): A data stream containing encoded object data, + supporting a read method; usually a BytearrayStream object. + """ + super(QueryRequestPayload, self).read(istream) + tstream = BytearrayStream(istream.read(self.length)) + + while(self.is_tag_next(Tags.QUERY_FUNCTION, tstream)): + query_function = QueryFunction() + query_function.read(tstream) + self.query_functions.append(query_function) + + self.is_oversized(tstream) + self.validate() + + def write(self, ostream): + """ + Write the data encoding the QueryRequestPayload object to a stream. + + Args: + ostream (Stream): A data stream in which to encode object data, + supporting a write method; usually a BytearrayStream object. + """ + tstream = BytearrayStream() + + for query_function in self.query_functions: + query_function.write(tstream) + + self.length = tstream.length() + super(QueryRequestPayload, self).write(ostream) + ostream.write(tstream.buffer) + + def validate(self): + """ + Error check the attributes of the QueryRequestPayload object. + """ + self.__validate() + + def __validate(self): + if isinstance(self.query_functions, list): + for i in xrange(len(self.query_functions)): + query_function = self.query_functions[i] + if not isinstance(query_function, QueryFunction): + msg = "invalid query function ({0} in list)".format(i) + msg += "; expected {0}, received {1}".format( + QueryFunction, query_function) + raise TypeError(msg) + else: + msg = "invalid query functions list" + msg += "; expected {0}, received {1}".format( + list, self.query_functions) + raise TypeError(msg) + + +class QueryResponsePayload(Struct): + """ + A response payload for the Query operation. + + The payload contains different sets of responses that the KMIP server + provides in response to the initial Query request. See Section 4.25 of the + KMIP 1.1 specification for more information. + + Attributes: + operations: A list of Operations supported by the server. + object_types: A list of ObjectTypes supported by the server. + vendor_identification: A string identifying the server vendor. + server_information: A structure containing vendor-specific fields and + substructures. + application_namespaces: A list of application namespaces supported by + the server. + extension_information: A list of ExtensionInformation objects detailing + Objects supported by the server with ItemTag values in the + Extensions range. + """ + def __init__(self, operations=None, object_types=None, + vendor_identification=None, server_information=None, + application_namespaces=None, extension_information=None): + """ + Construct a QueryResponsePayload object. + + Args: + operations (list): A list of Operations supported by the server. + object_types (list): A list of ObjectTypes supported by the server. + vendor_identification (VendorIdentification): A string identifying + the server vendor. + server_information (ServerInformation): A structure containing + vendor-specific fields and substructures. + application_namespaces (list): A list of application namespaces + supported by the server. + extension_information (list): A list of ExtensionInformation + objects detailing Objects supported by the server with ItemTag + values in the Extensions range. + """ + super(QueryResponsePayload, self).__init__(Tags.RESPONSE_PAYLOAD) + + if operations is None: + self.operations = list() + else: + self.operations = operations + + if object_types is None: + self.object_types = list() + else: + self.object_types = object_types + + self.vendor_identification = vendor_identification + self.server_information = server_information + + if application_namespaces is None: + self.application_namespaces = list() + else: + self.application_namespaces = application_namespaces + + if extension_information is None: + self.extension_information = list() + else: + self.extension_information = extension_information + + self.validate() + + def read(self, istream): + """ + Read the data encoding the QueryResponsePayload object and decode it + into its constituent parts. + + Args: + istream (Stream): A data stream containing encoded object data, + supporting a read method; usually a BytearrayStream object. + """ + super(QueryResponsePayload, self).read(istream) + tstream = BytearrayStream(istream.read(self.length)) + + while(self.is_tag_next(Tags.OPERATION, tstream)): + operation = Operation() + operation.read(tstream) + self.operations.append(operation) + + while(self.is_tag_next(Tags.OBJECT_TYPE, tstream)): + object_type = ObjectType() + object_type.read(tstream) + self.object_types.append(object_type) + + if self.is_tag_next(Tags.VENDOR_IDENTIFICATION, tstream): + self.vendor_identification = VendorIdentification() + self.vendor_identification.read(tstream) + + if self.is_tag_next(Tags.SERVER_INFORMATION, tstream): + self.server_information = ServerInformation() + self.server_information.read(tstream) + + while(self.is_tag_next(Tags.APPLICATION_NAMESPACE, tstream)): + application_namespace = ApplicationNamespace() + application_namespace.read(tstream) + self.application_namespaces.append(application_namespace) + + while(self.is_tag_next(Tags.EXTENSION_INFORMATION, tstream)): + extension_information = ExtensionInformation() + extension_information.read(tstream) + self.extension_information.append(extension_information) + + self.is_oversized(tstream) + self.validate() + + def write(self, ostream): + """ + Write the data encoding the QueryResponsePayload object to a stream. + + Args: + ostream (Stream): A data stream in which to encode object data, + supporting a write method; usually a BytearrayStream object. + """ + tstream = BytearrayStream() + + for operation in self.operations: + operation.write(tstream) + + for object_type in self.object_types: + object_type.write(tstream) + + if self.vendor_identification is not None: + self.vendor_identification.write(tstream) + + if self.server_information is not None: + self.server_information.write(tstream) + + for application_namespace in self.application_namespaces: + application_namespace.write(tstream) + + for extension_information in self.extension_information: + extension_information.write(tstream) + + self.length = tstream.length() + super(QueryResponsePayload, self).write(ostream) + ostream.write(tstream.buffer) + + def validate(self): + """ + Error check the attributes of the QueryRequestPayload object. + """ + self.__validate() + + def __validate(self): + # TODO (peter-hamilton) Add separate validate_list function for this + if isinstance(self.operations, list): + for i in xrange(len(self.operations)): + operation = self.operations[i] + if not isinstance(operation, Operation): + msg = "invalid operation ({0} in list)".format(i) + msg += "; expected {0}, received {1}".format( + Operation, operation) + raise TypeError(msg) + else: + msg = "invalid operations list" + msg += "; expected {0}, received {1}".format( + list, self.operations) + raise TypeError(msg) + + if isinstance(self.object_types, list): + for i in xrange(len(self.object_types)): + object_type = self.object_types[i] + if not isinstance(object_type, ObjectType): + msg = "invalid object type ({0} in list)".format(i) + msg += "; expected {0}, received {1}".format( + ObjectType, object_type) + raise TypeError(msg) + else: + msg = "invalid object types list" + msg += "; expected {0}, received {1}".format( + list, self.object_types) + raise TypeError(msg) + + if self.vendor_identification is not None: + if not isinstance(self.vendor_identification, + VendorIdentification): + msg = "invalid vendor identification" + msg += "; expected {0}, received {1}".format( + VendorIdentification, self.vendor_identification) + raise TypeError(msg) + + if self.server_information is not None: + if not isinstance(self.server_information, ServerInformation): + msg = "invalid server information" + msg += "; expected {0}, received {1}".format( + ServerInformation, self.server_information) + raise TypeError(msg) + + if isinstance(self.application_namespaces, list): + for i in xrange(len(self.application_namespaces)): + application_namespace = self.application_namespaces[i] + if not isinstance(application_namespace, ApplicationNamespace): + msg = "invalid application namespace ({0} in list)".format( + i) + msg += "; expected {0}, received {1}".format( + ApplicationNamespace, application_namespace) + raise TypeError(msg) + else: + msg = "invalid application namespaces list" + msg += "; expected {0}, received {1}".format( + list, self.application_namespaces) + raise TypeError(msg) + + if isinstance(self.extension_information, list): + for i in xrange(len(self.extension_information)): + extension_information = self.extension_information[i] + if not isinstance(extension_information, ExtensionInformation): + msg = "invalid extension information ({0} in list)".format( + i) + msg += "; expected {0}, received {1}".format( + ExtensionInformation, extension_information) + raise TypeError(msg) + else: + msg = "invalid extension information list" + msg += "; expected {0}, received {1}".format( + list, self.extension_information) + raise TypeError(msg) diff --git a/kmip/core/misc.py b/kmip/core/misc.py index 3014864..69cf936 100644 --- a/kmip/core/misc.py +++ b/kmip/core/misc.py @@ -13,11 +13,172 @@ # License for the specific language governing permissions and limitations # under the License. -from kmip.core import enums -from kmip.core import primitives +from kmip.core.enums import Tags +from kmip.core.enums import QueryFunction as QueryFunctionEnum + +from kmip.core.primitives import Enumeration +from kmip.core.primitives import Interval +from kmip.core.primitives import Struct +from kmip.core.primitives import TextString + +from kmip.core.utils import BytearrayStream -class Offset(primitives.Interval): +class Offset(Interval): + """ + An integer representing a positive change in time. - def __init__(self, value=None, tag=enums.Tags.OFFSET): - super(Offset, self).__init__(value, tag) + Used by Rekey and Recertify requests to indicate the time difference + between the InitializationDate and the ActivationDate of the replacement + item to be created. See Sections 4.4, 4.5, and 4.8 of the KMIP v1.1 + specification for more information. + """ + + def __init__(self, value=None): + """ + Construct an Offset object. + + Args: + value (int): An integer representing a positive change in time. + Optional, defaults to None. + """ + super(Offset, self).__init__(value, Tags.OFFSET) + + +class QueryFunction(Enumeration): + """ + An encodeable wrapper for the QueryFunction enumeration. + + Used by Query requests to specify the information to retrieve from the + KMIP server. See Sections 4.25 and 9.1.3.2.24 of the KMIP v1.1 + specification for more information. + """ + ENUM_TYPE = QueryFunctionEnum + + def __init__(self, value=None): + """ + Construct a QueryFunction object. + + Args: + value (QueryFunction enum): A QueryFunction enumeration value, + (e.g., QueryFunction.QUERY_OPERATIONS). Optional, default to + None. + """ + super(QueryFunction, self).__init__(value, Tags.QUERY_FUNCTION) + + +class VendorIdentification(TextString): + """ + A text string uniquely identifying a KMIP vendor. + + Returned by KMIP servers upon receipt of a Query request for server + information. See Section 4.25 of the KMIP v1.1. specification for more + information. + """ + + def __init__(self, value=None): + """ + Construct a VendorIdentification object. + + Args: + value (str): A string describing a KMIP vendor. Optional, defaults + to None. + """ + super(VendorIdentification, self).__init__( + value, Tags.VENDOR_IDENTIFICATION) + + +class ServerInformation(Struct): + """ + A structure containing vendor-specific fields and/or substructures. + + Returned by KMIP servers upon receipt of a Query request for server + information. See Section 4.25 of the KMIP v1.1 specification for more + information. + + Note: + There are no example structures nor data encodings in the KMIP + documentation of this object. Therefore this class handles encoding and + decoding its data in a generic way, using a BytearrayStream for primary + storage. The intent is for vendor-specific subclasses to decide how to + decode this data from the stream attribute. Likewise, these subclasses + must decide how to encode their data into the stream attribute. There + are no arguments to the constructor and therefore no means by which to + validate the object's contents. + """ + + def __init__(self): + """ + Construct a ServerInformation object. + """ + super(ServerInformation, self).__init__(Tags.SERVER_INFORMATION) + + self.data = BytearrayStream() + + self.validate() + + def read(self, istream): + """ + Read the data encoding the ServerInformation object and decode it into + its constituent parts. + + Args: + istream (Stream): A data stream containing encoded object data, + supporting a read method; usually a BytearrayStream object. + """ + super(ServerInformation, self).read(istream) + tstream = BytearrayStream(istream.read(self.length)) + + self.data = BytearrayStream(tstream.read()) + + self.is_oversized(tstream) + self.validate() + + def write(self, ostream): + """ + Write the data encoding the ServerInformation object to a stream. + + Args: + ostream (Stream): A data stream in which to encode object data, + supporting a write method; usually a BytearrayStream object. + """ + tstream = BytearrayStream() + tstream.write(self.data.buffer) + + self.length = tstream.length() + super(ServerInformation, self).write(ostream) + ostream.write(tstream.buffer) + + def validate(self): + """ + Error check the types of the different parts of the ServerInformation + object. + """ + self.__validate() + + def __validate(self): + # NOTE (peter-hamilton): Intentional pass, no way to validate data. + pass + + def __eq__(self, other): + if isinstance(other, ServerInformation): + if len(self.data) != len(other.data): + return False + elif self.data != other.data: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, ServerInformation): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + return "ServerInformation()" + + def __str__(self): + return str(self.data) diff --git a/kmip/core/objects.py b/kmip/core/objects.py index 5d270c6..597cdaf 100644 --- a/kmip/core/objects.py +++ b/kmip/core/objects.py @@ -921,3 +921,244 @@ class PublicKeyTemplateAttribute(TemplateAttribute): attributes=None): super(PublicKeyTemplateAttribute, self).__init__( names, attributes, Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE) + + +# 2.1.9 +class ExtensionName(TextString): + """ + The name of an extended Object. + + A part of ExtensionInformation, specifically identifying an Object that is + a custom vendor addition to the KMIP specification. See Section 2.1.9 of + the KMIP 1.1 specification for more information. + + Attributes: + value: The string data representing the extension name. + """ + def __init__(self, value=''): + """ + Construct an ExtensionName object. + + Args: + value (str): The string data representing the extension name. + Optional, defaults to the empty string. + """ + super(ExtensionName, self).__init__(value, Tags.EXTENSION_NAME) + + +class ExtensionTag(Integer): + """ + The tag of an extended Object. + + A part of ExtensionInformation. See Section 2.1.9 of the KMIP 1.1 + specification for more information. + + Attributes: + value: The tag number identifying the extended object. + """ + def __init__(self, value=0): + """ + Construct an ExtensionTag object. + + Args: + value (int): A number representing the extension tag. Often + displayed in hex format. Optional, defaults to 0. + """ + super(ExtensionTag, self).__init__(value, Tags.EXTENSION_TAG) + + +class ExtensionType(Integer): + """ + The type of an extended Object. + + A part of ExtensionInformation, specifically identifying the type of the + Object in the specification extension. See Section 2.1.9 of the KMIP 1.1 + specification for more information. + + Attributes: + value: The type enumeration for the extended object. + """ + def __init__(self, value=None): + """ + Construct an ExtensionType object. + + Args: + value (Types): A number representing a Types enumeration value, + indicating the type of the extended Object. Optional, defaults + to None. + """ + super(ExtensionType, self).__init__(value, Tags.EXTENSION_TYPE) + + +class ExtensionInformation(Struct): + """ + A structure describing Objects defined in KMIP specification extensions. + + It is used specifically for Objects with Item Tag values in the Extensions + range and appears in responses to Query requests for server extension + information. See Sections 2.1.9 and 4.25 of the KMIP 1.1 specification for + more information. + + Attributes: + extension_name: The name of the extended Object. + extension_tag: The tag of the extended Object. + extension_type: The type of the extended Object. + """ + def __init__(self, extension_name=None, extension_tag=None, + extension_type=None): + """ + Construct an ExtensionInformation object. + + Args: + extension_name (ExtensionName): The name of the extended Object. + extension_tag (ExtensionTag): The tag of the extended Object. + extension_type (ExtensionType): The type of the extended Object. + """ + super(ExtensionInformation, self).__init__(Tags.EXTENSION_INFORMATION) + + if extension_name is None: + self.extension_name = ExtensionName() + else: + self.extension_name = extension_name + + self.extension_tag = extension_tag + self.extension_type = extension_type + + self.validate() + + def read(self, istream): + """ + Read the data encoding the ExtensionInformation object and decode it + into its constituent parts. + + Args: + istream (Stream): A data stream containing encoded object data, + supporting a read method; usually a BytearrayStream object. + """ + super(ExtensionInformation, self).read(istream) + tstream = BytearrayStream(istream.read(self.length)) + + self.extension_name.read(tstream) + + if self.is_tag_next(Tags.EXTENSION_TAG, tstream): + self.extension_tag = ExtensionTag() + self.extension_tag.read(tstream) + if self.is_tag_next(Tags.EXTENSION_TYPE, tstream): + self.extension_type = ExtensionType() + self.extension_type.read(tstream) + + self.is_oversized(tstream) + self.validate() + + def write(self, ostream): + """ + Write the data encoding the ExtensionInformation object to a stream. + + Args: + ostream (Stream): A data stream in which to encode object data, + supporting a write method; usually a BytearrayStream object. + """ + tstream = BytearrayStream() + + self.extension_name.write(tstream) + + if self.extension_tag is not None: + self.extension_tag.write(tstream) + if self.extension_type is not None: + self.extension_type.write(tstream) + + self.length = tstream.length() + super(ExtensionInformation, self).write(ostream) + ostream.write(tstream.buffer) + + def validate(self): + """ + Error check the attributes of the ExtensionInformation object. + """ + self.__validate() + + def __validate(self): + if not isinstance(self.extension_name, ExtensionName): + msg = "invalid extension name" + msg += "; expected {0}, received {1}".format( + ExtensionName, self.extension_name) + raise TypeError(msg) + + if self.extension_tag is not None: + if not isinstance(self.extension_tag, ExtensionTag): + msg = "invalid extension tag" + msg += "; expected {0}, received {1}".format( + ExtensionTag, self.extension_tag) + raise TypeError(msg) + + if self.extension_type is not None: + if not isinstance(self.extension_type, ExtensionType): + msg = "invalid extension type" + msg += "; expected {0}, received {1}".format( + ExtensionType, self.extension_type) + raise TypeError(msg) + + def __eq__(self, other): + if isinstance(other, ExtensionInformation): + if self.extension_name != other.extension_name: + return False + elif self.extension_tag != other.extension_tag: + return False + elif self.extension_type != other.extension_type: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, ExtensionInformation): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + name = "extension_name={0}".format(repr(self.extension_name)) + tag = "extension_tag={0}".format(repr(self.extension_tag)) + typ = "extension_type={0}".format(repr(self.extension_type)) + return "ExtensionInformation({0}, {1}, {2})".format(name, tag, typ) + + def __str__(self): + return repr(self) + + @classmethod + def create(cls, extension_name=None, extension_tag=None, + extension_type=None): + """ + Construct an ExtensionInformation object from provided extension + values. + + Args: + extension_name (str): The name of the extension. Optional, + defaults to None. + extension_tag (int): The tag number of the extension. Optional, + defaults to None. + extension_type (int): The type index of the extension. Optional, + defaults to None. + + Returns: + ExtensionInformation: The newly created set of extension + information. + + Example: + >>> x = ExtensionInformation.create('extension', 1, 1) + >>> x.extension_name.value + ExtensionName(value='extension') + >>> x.extension_tag.value + ExtensionTag(value=1) + >>> x.extension_type.value + ExtensionType(value=1) + """ + extension_name = ExtensionName(extension_name) + extension_tag = ExtensionTag(extension_tag) + extension_type = ExtensionType(extension_type) + + return ExtensionInformation( + extension_name=extension_name, + extension_tag=extension_tag, + extension_type=extension_type) diff --git a/kmip/core/primitives.py b/kmip/core/primitives.py index 08abfa4..d4da2a5 100644 --- a/kmip/core/primitives.py +++ b/kmip/core/primitives.py @@ -156,8 +156,9 @@ class Struct(Base): def __init__(self, tag=Tags.DEFAULT): super(Struct, self).__init__(tag, type=Types.STRUCTURE) + # NOTE (peter-hamilton) If seen, should indicate repr needs to be defined def __repr__(self): - return '' + return "Struct()" class Integer(Base): @@ -216,7 +217,10 @@ class Integer(Base): num_bytes) def __repr__(self): - return '' % (self.value) + return "{0}(value={1})".format(type(self).__name__, repr(self.value)) + + def __str__(self): + return "{0}".format(repr(self.value)) def __eq__(self, other): if isinstance(other, Integer): @@ -413,7 +417,11 @@ class Enumeration(Integer): Enum, type(self.enum))) def __repr__(self): - return '' % (self.enum.name, self.enum.value) + return "Enumeration(value={0})".format(self.enum) + + def __str__(self): + return "{0} - {1} - {2}".format( + type(self.enum), self.enum.name, self.enum.value) class Boolean(Base): @@ -550,7 +558,10 @@ class TextString(Base): data_type)) def __repr__(self): - return '' % (self.value) + return "{0}(value={1})".format(type(self).__name__, repr(self.value)) + + def __str__(self): + return "{0}".format(repr(self.value)) def __eq__(self, other): if isinstance(other, TextString): diff --git a/kmip/core/utils.py b/kmip/core/utils.py index adb4ca2..41b3882 100644 --- a/kmip/core/utils.py +++ b/kmip/core/utils.py @@ -66,7 +66,7 @@ def build_er_error(class_object, descriptor, expected, received, class BytearrayStream(io.RawIOBase): def __init__(self, data=None): if data is None: - self.buffer = b'' + self.buffer = bytes() else: self.buffer = bytes(data) @@ -81,10 +81,11 @@ class BytearrayStream(io.RawIOBase): return data def readall(self): - data = self.buffer[0:] - self.buffer = self.buffer[len(self.buffer):] + data = self.buffer + self.buffer = bytes() return data + # TODO (peter-hamilton) Unused, add documentation or cut. def readinto(self, b): if len(b) <= len(self.buffer): num_bytes_to_read = len(b) @@ -117,6 +118,17 @@ class BytearrayStream(io.RawIOBase): def __eq__(self, other): if isinstance(other, BytearrayStream): - return (self.buffer == other.buffer) + if len(self.buffer) != len(other.buffer): + return False + elif self.buffer != other.buffer: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, BytearrayStream): + return not (self == other) else: return NotImplemented diff --git a/kmip/demos/units/query.py b/kmip/demos/units/query.py new file mode 100644 index 0000000..8065385 --- /dev/null +++ b/kmip/demos/units/query.py @@ -0,0 +1,110 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +from six.moves import xrange + +from kmip.core.enums import Operation +from kmip.core.enums import QueryFunction as QueryFunctionEnum +from kmip.core.enums import ResultStatus + +from kmip.core.misc import QueryFunction + +from kmip.demos import utils + +from kmip.services.kmip_client import KMIPProxy + + +if __name__ == '__main__': + # Build and parse arguments + parser = utils.build_cli_parser(Operation.QUERY) + opts, args = parser.parse_args(sys.argv[1:]) + + username = opts.username + password = opts.password + + # Build and setup logging and needed factories + f_log = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, + 'logconfig.ini') + logging.config.fileConfig(f_log) + logger = logging.getLogger(__name__) + + # Build query function list. + query_functions = list() + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_OPERATIONS)) + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_OBJECTS)) + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_SERVER_INFORMATION)) + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_APPLICATION_NAMESPACES)) + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_EXTENSION_LIST)) + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_EXTENSION_MAP)) + + # Build the client and connect to the server + client = KMIPProxy() + client.open() + + result = client.query(query_functions=query_functions) + client.close() + + # Display operation results + logger.info('query() result status: {0}'.format( + result.result_status.enum)) + + if result.result_status.enum == ResultStatus.SUCCESS: + operations = result.operations + object_types = result.object_types + vendor_identification = result.vendor_identification + server_information = result.server_information + application_namespaces = result.application_namespaces + extension_information = result.extension_information + + logger.info('number of operations supported: {0}'.format( + len(operations))) + for i in xrange(len(operations)): + logger.info('operation supported: {0}'.format(operations[i])) + + logger.info('number of object types supported: {0}'.format( + len(object_types))) + for i in xrange(len(object_types)): + logger.info('object type supported: {0}'.format(object_types[i])) + + logger.info('vendor identification: {0}'.format(vendor_identification)) + logger.info('server information: {0}'.format(server_information)) + + logger.info('number of application namespaces supported: {0}'.format( + len(application_namespaces))) + for i in xrange(len(application_namespaces)): + logger.info('application namespace supported: {0}'.format( + application_namespaces[i])) + + logger.info('number of extensions supported: {0}'.format( + len(extension_information))) + for i in xrange(len(extension_information)): + logger.info('extension supported: {0}'.format( + extension_information[i])) + + else: + logger.info('query() result reason: {0}'.format( + result.result_reason.enum)) + logger.info('query() result message: {0}'.format( + result.result_message.value)) diff --git a/kmip/demos/utils.py b/kmip/demos/utils.py index f678c4a..77f45cb 100644 --- a/kmip/demos/utils.py +++ b/kmip/demos/utils.py @@ -119,6 +119,8 @@ def build_cli_parser(operation): default=None, dest="length", help="Key length in bits (e.g., 128, 256)") + elif operation is Operation.QUERY: + pass elif operation is Operation.DISCOVER_VERSIONS: pass else: diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index 8e037b5..5ed6aa1 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -19,6 +19,7 @@ from kmip.services.results import DestroyResult from kmip.services.results import DiscoverVersionsResult from kmip.services.results import GetResult from kmip.services.results import LocateResult +from kmip.services.results import QueryResult from kmip.services.results import RegisterResult from kmip.services.results import RekeyKeyPairResult @@ -45,6 +46,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -176,6 +178,31 @@ class KMIPProxy(KMIP): object_group_member=object_group_member, attributes=attributes, credential=credential) + def query(self, batch=False, query_functions=None, credential=None): + """ + Send a Query request to the server. + + Args: + batch (boolean): A flag indicating if the operation should be sent + with a batch of additional operations. Defaults to False. + query_functions (list): A list of QueryFunction enumerations + indicating what information the client wants from the server. + Optional, defaults to None. + credential (Credential): A Credential object containing + authentication information for the server. Optional, defaults + to None. + """ + batch_item = self._build_query_batch_item(query_functions) + + # TODO (peter-hamilton): Replace this with official client batch mode. + if batch: + self.batch_items.append(batch_item) + else: + request = self._build_request_message(credential, [batch_item]) + response = self._send_and_receive_message(request) + results = self._process_batch_items(response) + return results[0] + def discover_versions(self, batch=False, protocol_versions=None, credential=None): batch_item = self._build_discover_versions_batch_item( @@ -257,6 +284,13 @@ class KMIPProxy(KMIP): operation=operation, request_payload=payload) return batch_item + def _build_query_batch_item(self, query_functions=None): + operation = Operation(OperationEnum.QUERY) + payload = query.QueryRequestPayload(query_functions) + batch_item = messages.RequestBatchItem( + operation=operation, request_payload=payload) + return batch_item + def _build_discover_versions_batch_item(self, protocol_versions=None): operation = Operation(OperationEnum.DISCOVER_VERSIONS) @@ -281,6 +315,8 @@ class KMIPProxy(KMIP): return self._process_create_key_pair_batch_item elif operation == OperationEnum.REKEY_KEY_PAIR: return self._process_rekey_key_pair_batch_item + elif operation == OperationEnum.QUERY: + return self._process_query_batch_item elif operation == OperationEnum.DISCOVER_VERSIONS: return self._process_discover_versions_batch_item else: @@ -317,6 +353,35 @@ class KMIPProxy(KMIP): return self._process_key_pair_batch_item( batch_item, RekeyKeyPairResult) + def _process_query_batch_item(self, batch_item): + payload = batch_item.response_payload + + operations = None + object_types = None + vendor_identification = None + server_information = None + application_namespaces = None + extension_information = None + + if payload is not None: + operations = payload.operations + object_types = payload.object_types + vendor_identification = payload.vendor_identification + server_information = payload.server_information + application_namespaces = payload.application_namespaces + extension_information = payload.extension_information + + return QueryResult( + batch_item.result_status, + batch_item.result_reason, + batch_item.result_message, + operations, + object_types, + vendor_identification, + server_information, + application_namespaces, + extension_information) + def _process_discover_versions_batch_item(self, batch_item): payload = batch_item.response_payload diff --git a/kmip/services/results.py b/kmip/services/results.py index bdf754b..d68495e 100644 --- a/kmip/services/results.py +++ b/kmip/services/results.py @@ -174,6 +174,60 @@ class LocateResult(OperationResult): self.uuids = uuids +class QueryResult(OperationResult): + """ + A container for the results of a Query operation. + + Attributes: + result_status: The status of the Query operation (e.g., success or + failure). + result_reason: The reason for the operation status. + result_message: Extra information pertaining to the status reason. + operations: A list of Operations supported by the server. + object_types: A list of Object Types supported by the server. + vendor_identification: + server_information: + application_namespaces: A list of namespaces supported by the server. + extension_information: A list of extensions supported by the server. + """ + + def __init__(self, + result_status, + result_reason=None, + result_message=None, + operations=None, + object_types=None, + vendor_identification=None, + server_information=None, + application_namespaces=None, + extension_information=None): + super(QueryResult, self).__init__( + result_status, result_reason, result_message) + + if operations is None: + self.operations = list() + else: + self.operations = operations + + if object_types is None: + self.object_types = list() + else: + self.object_types = object_types + + self.vendor_identification = vendor_identification + self.server_information = server_information + + if application_namespaces is None: + self.application_namespaces = list() + else: + self.application_namespaces = application_namespaces + + if extension_information is None: + self.extension_information = list() + else: + self.extension_information = extension_information + + class DiscoverVersionsResult(OperationResult): def __init__(self, diff --git a/kmip/tests/core/factories/payloads/test_request.py b/kmip/tests/core/factories/payloads/test_request.py index 9f2f2b7..7097d53 100644 --- a/kmip/tests/core/factories/payloads/test_request.py +++ b/kmip/tests/core/factories/payloads/test_request.py @@ -24,6 +24,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -138,8 +139,8 @@ class TestRequestPayloadFactory(testtools.TestCase): self.factory.create, Operation.VALIDATE) def test_create_query_payload(self): - self._test_not_implemented( - self.factory.create, Operation.QUERY) + payload = self.factory.create(Operation.QUERY) + self._test_payload_type(payload, query.QueryRequestPayload) def test_create_cancel_payload(self): self._test_not_implemented( diff --git a/kmip/tests/core/factories/payloads/test_response.py b/kmip/tests/core/factories/payloads/test_response.py index b47c31c..597accb 100644 --- a/kmip/tests/core/factories/payloads/test_response.py +++ b/kmip/tests/core/factories/payloads/test_response.py @@ -24,6 +24,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -138,8 +139,8 @@ class TestResponsePayloadFactory(testtools.TestCase): self.factory.create, Operation.VALIDATE) def test_create_query_payload(self): - self._test_not_implemented( - self.factory.create, Operation.QUERY) + payload = self.factory.create(Operation.QUERY) + self._test_payload_type(payload, query.QueryResponsePayload) def test_create_cancel_payload(self): self._test_not_implemented( diff --git a/kmip/tests/core/messages/payloads/test_query.py b/kmip/tests/core/messages/payloads/test_query.py new file mode 100644 index 0000000..6b39e39 --- /dev/null +++ b/kmip/tests/core/messages/payloads/test_query.py @@ -0,0 +1,587 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from six.moves import xrange + +from testtools import TestCase + +from kmip.core import utils + +from kmip.core.attributes import ObjectType + +from kmip.core.enums import ObjectType as ObjectTypeEnum +from kmip.core.enums import Operation as OperationEnum +from kmip.core.enums import QueryFunction as QueryFunctionEnum + +from kmip.core.messages.contents import Operation +from kmip.core.messages.payloads import query + +from kmip.core.misc import QueryFunction +from kmip.core.misc import VendorIdentification +from kmip.core.misc import ServerInformation + +from kmip.core.objects import ExtensionInformation +from kmip.core.objects import ExtensionName + + +class TestQueryRequestPayload(TestCase): + """ + Test suite for the QueryRequestPayload class. + + Test encodings obtained from Sections 12.1 and 12.2 of the KMIP 1.1 Test + Cases documentation. + """ + + def setUp(self): + super(TestQueryRequestPayload, self).setUp() + + self.query_functions_a = list() + self.query_functions_b = list() + self.query_functions_c = list() + + self.query_functions_b.append(QueryFunction( + QueryFunctionEnum.QUERY_OPERATIONS)) + self.query_functions_b.append(QueryFunction( + QueryFunctionEnum.QUERY_OBJECTS)) + self.query_functions_b.append(QueryFunction( + QueryFunctionEnum.QUERY_SERVER_INFORMATION)) + + self.query_functions_c.append(QueryFunction( + QueryFunctionEnum.QUERY_EXTENSION_LIST)) + + self.encoding_a = utils.BytearrayStream(( + b'\x42\x00\x79\x01\x00\x00\x00\x00')) + + self.encoding_b = utils.BytearrayStream(( + b'\x42\x00\x79\x01\x00\x00\x00\x30\x42\x00\x74\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x74\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x74\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x03\x00\x00\x00\x00')) + + self.encoding_c = utils.BytearrayStream(( + b'\x42\x00\x79\x01\x00\x00\x00\x10\x42\x00\x74\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x05\x00\x00\x00\x00')) + + def tearDown(self): + super(TestQueryRequestPayload, self).tearDown() + + def test_init_with_none(self): + """ + Test that a QueryRequestPayload object can be constructed with no + specified value. + """ + query.QueryRequestPayload() + + def test_init_with_args(self): + """ + Test that a QueryRequestPayload object can be constructed with valid + values. + """ + query.QueryRequestPayload(self.query_functions_a) + query.QueryRequestPayload(self.query_functions_b) + query.QueryRequestPayload(self.query_functions_c) + + def test_validate_with_invalid_query_functions_list(self): + """ + Test that a TypeError exception is raised when an invalid QueryFunction + list is used to construct a QueryRequestPayload object. + """ + kwargs = {'query_functions': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid query functions list", + query.QueryRequestPayload, **kwargs) + + def test_validate_with_invalid_query_functions_item(self): + """ + Test that a TypeError exception is raised when an invalid QueryFunction + item is used to construct a QueryRequestPayload object. + """ + kwargs = {'query_functions': ['invalid']} + self.assertRaisesRegexp( + TypeError, "invalid query function", + query.QueryRequestPayload, **kwargs) + + def _test_read(self, stream, query_functions): + payload = query.QueryRequestPayload() + payload.read(stream) + expected = len(query_functions) + observed = len(payload.query_functions) + + msg = "query functions list decoding mismatch" + msg += "; expected {0} results, received {1}".format( + expected, observed) + self.assertEqual(expected, observed, msg) + + for i in xrange(len(query_functions)): + expected = query_functions[i] + observed = payload.query_functions[i] + + msg = "query function decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_read_with_empty_query_functions_list(self): + """ + Test that a QueryRequestPayload object with no data can be read from + a data stream. + """ + self._test_read(self.encoding_a, self.query_functions_a) + + def test_read_with_multiple_query_functions(self): + """ + Test that a QueryRequestPayload object with multiple pieces of data + can be read from a data stream. + """ + self._test_read(self.encoding_b, self.query_functions_b) + + def test_read_with_one_query_function(self): + """ + Test that a QueryRequestPayload object with a single piece of data can + be read from a data stream. + """ + self._test_read(self.encoding_c, self.query_functions_c) + + def _test_write(self, encoding, query_functions): + stream = utils.BytearrayStream() + payload = query.QueryRequestPayload(query_functions) + payload.write(stream) + + length_expected = len(encoding) + length_received = len(stream) + + msg = "encoding lengths not equal" + msg += "; expected {0}, received {1}".format( + length_expected, length_received) + self.assertEqual(length_expected, length_received, msg) + + msg = "encoding mismatch" + msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(encoding, stream) + + self.assertEqual(encoding, stream, msg) + + def test_write_with_empty_query_functions_list(self): + """ + Test that a QueryRequestPayload object with no data can be written to + a data stream. + """ + self._test_write(self.encoding_a, self.query_functions_a) + + def test_write_with_multiple_query_functions(self): + """ + Test that a QueryRequestPayload object with multiple pieces of data + can be written to a data stream. + """ + self._test_write(self.encoding_b, self.query_functions_b) + + def test_write_with_one_query_function(self): + """ + Test that a QueryRequestPayload object with a single piece of data can + be written to a data stream. + """ + self._test_write(self.encoding_c, self.query_functions_c) + + +class TestQueryResponsePayload(TestCase): + """ + Test encodings obtained from Sections 12.1 and 12.2 of the KMIP 1.1 Test + Cases documentation. + """ + + def setUp(self): + super(TestQueryResponsePayload, self).setUp() + + self.operations = list() + self.object_types = list() + self.application_namespaces = list() + self.extension_information = list() + + self.vendor_identification = VendorIdentification( + "IBM test server, not-TKLM 2.0.1.1 KMIP 2.0.0.1") + self.server_information = ServerInformation() + + self.operations.append(Operation(OperationEnum.CREATE)) + self.operations.append(Operation(OperationEnum.CREATE_KEY_PAIR)) + self.operations.append(Operation(OperationEnum.REGISTER)) + self.operations.append(Operation(OperationEnum.REKEY)) + self.operations.append(Operation(OperationEnum.CERTIFY)) + self.operations.append(Operation(OperationEnum.RECERTIFY)) + self.operations.append(Operation(OperationEnum.LOCATE)) + self.operations.append(Operation(OperationEnum.CHECK)) + self.operations.append(Operation(OperationEnum.GET)) + self.operations.append(Operation(OperationEnum.GET_ATTRIBUTES)) + self.operations.append(Operation(OperationEnum.GET_ATTRIBUTE_LIST)) + self.operations.append(Operation(OperationEnum.ADD_ATTRIBUTE)) + self.operations.append(Operation(OperationEnum.MODIFY_ATTRIBUTE)) + self.operations.append(Operation(OperationEnum.DELETE_ATTRIBUTE)) + self.operations.append(Operation(OperationEnum.OBTAIN_LEASE)) + self.operations.append(Operation(OperationEnum.GET_USAGE_ALLOCATION)) + self.operations.append(Operation(OperationEnum.ACTIVATE)) + self.operations.append(Operation(OperationEnum.REVOKE)) + self.operations.append(Operation(OperationEnum.DESTROY)) + self.operations.append(Operation(OperationEnum.ARCHIVE)) + self.operations.append(Operation(OperationEnum.RECOVER)) + self.operations.append(Operation(OperationEnum.QUERY)) + self.operations.append(Operation(OperationEnum.CANCEL)) + self.operations.append(Operation(OperationEnum.POLL)) + self.operations.append(Operation(OperationEnum.REKEY_KEY_PAIR)) + self.operations.append(Operation(OperationEnum.DISCOVER_VERSIONS)) + + self.object_types.append(ObjectType(ObjectTypeEnum.CERTIFICATE)) + self.object_types.append(ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)) + self.object_types.append(ObjectType(ObjectTypeEnum.PUBLIC_KEY)) + self.object_types.append(ObjectType(ObjectTypeEnum.PRIVATE_KEY)) + self.object_types.append(ObjectType(ObjectTypeEnum.TEMPLATE)) + self.object_types.append(ObjectType(ObjectTypeEnum.SECRET_DATA)) + + self.extension_information.append(ExtensionInformation( + extension_name=ExtensionName("ACME LOCATION"))) + self.extension_information.append(ExtensionInformation( + extension_name=ExtensionName("ACME ZIP CODE"))) + + self.encoding_a = utils.BytearrayStream(( + b'\x42\x00\x7C\x01\x00\x00\x00\x00')) + + self.encoding_b = utils.BytearrayStream(( + b'\x42\x00\x7C\x01\x00\x00\x02\x40\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x03\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x04\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x06\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x07\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x08\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x09\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0A\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0B\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0C\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0D\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0E\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0F\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x10\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x11\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x12\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x13\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x14\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x15\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x16\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x18\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x19\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x1A\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x1D\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x1E\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x03\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x04\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x06\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x07\x00\x00\x00\x00\x42\x00\x9D\x07\x00\x00\x00\x2E' + b'\x49\x42\x4D\x20\x74\x65\x73\x74\x20\x73\x65\x72\x76\x65\x72\x2C' + b'\x20\x6E\x6F\x74\x2D\x54\x4B\x4C\x4D\x20\x32\x2E\x30\x2E\x31\x2E' + b'\x31\x20\x4B\x4D\x49\x50\x20\x32\x2E\x30\x2E\x30\x2E\x31\x00\x00' + b'\x42\x00\x88\x01\x00\x00\x00\x00')) + + self.encoding_c = utils.BytearrayStream(( + b'\x42\x00\x7C\x01\x00\x00\x00\x40\x42\x00\xA4\x01\x00\x00\x00\x18' + b'\x42\x00\xA5\x07\x00\x00\x00\x0D\x41\x43\x4D\x45\x20\x4C\x4F\x43' + b'\x41\x54\x49\x4F\x4E\x00\x00\x00\x42\x00\xA4\x01\x00\x00\x00\x18' + b'\x42\x00\xA5\x07\x00\x00\x00\x0D\x41\x43\x4D\x45\x20\x5A\x49\x50' + b'\x20\x43\x4F\x44\x45\x00\x00\x00')) + + def tearDown(self): + super(TestQueryResponsePayload, self).tearDown() + + def test_init_with_none(self): + """ + Test that a QueryResponsePayload object can be constructed with no + specified value. + """ + query.QueryResponsePayload() + + def test_init_with_args(self): + """ + Test that a QueryResponsePayload object can be constructed with valid + values. + """ + query.QueryResponsePayload( + operations=self.operations, + object_types=self.object_types, + vendor_identification=self.vendor_identification, + server_information=self.server_information, + application_namespaces=self.application_namespaces, + extension_information=self.extension_information) + + def test_validate_with_invalid_operations_list(self): + """ + Test that a TypeError exception is raised when an invalid Operations + list is used to construct a QueryResponsePayload object. + """ + kwargs = {'operations': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid operations list", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_operations_item(self): + """ + Test that a TypeError exception is raised when an invalid Operations + item is used to construct a QueryResponsePayload object. + """ + kwargs = {'operations': ['invalid']} + self.assertRaisesRegexp( + TypeError, "invalid operation", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_object_types_list(self): + """ + Test that a TypeError exception is raised when an invalid ObjectTypes + list is used to construct a QueryResponsePayload object. + """ + kwargs = {'object_types': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid object types list", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_object_types_item(self): + """ + Test that a TypeError exception is raised when an invalid ObjectTypes + item is used to construct a QueryResponsePayload object. + """ + kwargs = {'object_types': ['invalid']} + self.assertRaisesRegexp( + TypeError, "invalid object type", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_vendor_identification(self): + """ + Test that a TypeError exception is raised when an invalid + VendorIdentification item is used to construct a QueryResponsePayload + object. + """ + kwargs = {'vendor_identification': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid vendor identification", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_server_information(self): + """ + Test that a TypeError exception is raised when an invalid + ServerInformation item is used to construct a QueryResponsePayload + object. + """ + kwargs = {'server_information': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid server information", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_application_namespaces_list(self): + """ + Test that a TypeError exception is raised when an invalid + ApplicationNamespaces list is used to construct a QueryResponsePayload + object. + """ + kwargs = {'application_namespaces': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid application namespaces list", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_application_namespaces_item(self): + """ + Test that a TypeError exception is raised when an invalid + ApplicationNamespaces item is used to construct a QueryResponsePayload + object. + """ + kwargs = {'application_namespaces': ['invalid']} + self.assertRaisesRegexp( + TypeError, "invalid application namespace", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_extension_information_list(self): + """ + Test that a TypeError exception is raised when an invalid + ExtensionInformation list is used to construct a QueryResponsePayload + object. + """ + kwargs = {'extension_information': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid extension information list", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_extension_information_item(self): + """ + Test that a TypeError exception is raised when an invalid + ExtensionInformation item is used to construct a QueryResponsePayload + object. + """ + kwargs = {'extension_information': ['invalid']} + self.assertRaisesRegexp( + TypeError, "invalid extension information", + query.QueryResponsePayload, **kwargs) + + def _test_read(self, stream, operations, object_types, + vendor_identification, server_information, + application_namespaces, extension_information): + payload = query.QueryResponsePayload() + payload.read(stream) + + # Test decoding of all operations. + expected = len(operations) + observed = len(payload.operations) + + msg = "operations list decoding mismatch" + msg += "; expected {0} results, received {1}".format( + expected, observed) + self.assertEqual(expected, observed, msg) + + for i in xrange(len(operations)): + expected = operations[i] + observed = payload.operations[i] + + msg = "operation decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + # Test decoding of all object types. + expected = len(object_types) + observed = len(payload.object_types) + + msg = "object types list decoding mismatch" + msg += "; expected {0} results, received {1}".format( + expected, observed) + self.assertEqual(expected, observed, msg) + + for i in xrange(len(object_types)): + expected = object_types[i] + observed = payload.object_types[i] + + msg = "object type decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + # Test decoding of vendor identification. + expected = vendor_identification + observed = payload.vendor_identification + + msg = "vendor identification decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + # Test decoding of server information. + expected = server_information + observed = payload.server_information + + msg = "server information decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + # Test decoding of all application namespaces. + expected = len(application_namespaces) + observed = len(payload.application_namespaces) + + msg = "application namespaces list decoding mismatch" + msg += "; expected {0} results, received {1}".format( + expected, observed) + self.assertEqual(expected, observed, msg) + + # Test decoding of all extension information. + expected = len(extension_information) + observed = len(payload.extension_information) + + msg = "extension information list decoding mismatch" + msg += "; expected {0} results, received {1}".format( + expected, observed) + self.assertEqual(expected, observed, msg) + + for i in xrange(len(extension_information)): + expected = extension_information[i] + observed = payload.extension_information[i] + + msg = "extension information decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_read_with_no_data(self): + """ + Test that a QueryResponsePayload object with no data can be read from + a data stream. + """ + self._test_read( + self.encoding_a, list(), list(), None, None, list(), list()) + + def test_read_with_operations_object_types_and_server_info(self): + """ + Test that a QueryResponsePayload object with multiple pieces of data + can be read from a data stream. + """ + self._test_read( + self.encoding_b, self.operations, self.object_types, + self.vendor_identification, self.server_information, + self.application_namespaces, list()) + + def test_read_with_extension_information(self): + """ + Test that a QueryResponsePayload object with one piece of data can be + read from a data stream. + """ + self._test_read( + self.encoding_c, list(), list(), None, None, + self.application_namespaces, self.extension_information) + + def _test_write(self, encoding, operations, object_types, + vendor_identification, server_information, + application_namespaces, extension_information): + stream = utils.BytearrayStream() + payload = query.QueryResponsePayload( + operations, object_types, vendor_identification, + server_information, application_namespaces, extension_information) + payload.write(stream) + + length_expected = len(encoding) + length_received = len(stream) + + msg = "encoding lengths not equal" + msg += "; expected {0}, received {1}".format( + length_expected, length_received) + self.assertEqual(length_expected, length_received, msg) + + msg = "encoding mismatch" + msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(encoding, stream) + + self.assertEqual(encoding, stream, msg) + + def test_write_with_no_data(self): + """ + Test that a QueryResponsePayload object with no data can be written to + a data stream. + """ + self._test_write( + self.encoding_a, list(), list(), None, None, list(), list()) + + def test_write_with_operations_object_types_and_server_info(self): + """ + Test that a QueryResponsePayload object with multiple pieces of data + can be written to a data stream. + """ + self._test_write( + self.encoding_b, self.operations, self.object_types, + self.vendor_identification, self.server_information, + self.application_namespaces, list()) + + def test_write_with_extension_information(self): + """ + Test that a QueryResponsePayload object with one piece of data can be + written to a data stream. + """ + self._test_write( + self.encoding_c, list(), list(), None, None, + self.application_namespaces, self.extension_information) diff --git a/kmip/tests/core/misc/__init__.py b/kmip/tests/core/misc/__init__.py new file mode 100644 index 0000000..417e2f9 --- /dev/null +++ b/kmip/tests/core/misc/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/kmip/tests/core/misc/test_misc.py b/kmip/tests/core/misc/test_misc.py new file mode 100644 index 0000000..443dacb --- /dev/null +++ b/kmip/tests/core/misc/test_misc.py @@ -0,0 +1,117 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from six import string_types +from testtools import TestCase + +from kmip.core.enums import QueryFunction as QueryFunctionEnum + +from kmip.core.misc import QueryFunction +from kmip.core.misc import VendorIdentification + + +class TestQueryFunction(TestCase): + """ + A test suite for the QueryFunction class. + + Since QueryFunction is a simple wrapper for the Enumeration primitive, + only a few tests pertaining to construction are needed. + """ + + def setUp(self): + super(TestQueryFunction, self).setUp() + + def tearDown(self): + super(TestQueryFunction, self).tearDown() + + def _test_init(self, value): + if (isinstance(value, QueryFunctionEnum)) or (value is None): + query_function = QueryFunction(value) + + msg = "expected {0}, observed {1}".format( + value, query_function.enum) + self.assertEqual(value, query_function.enum, msg) + else: + self.assertRaises(TypeError, QueryFunction, value) + + def test_init_with_none(self): + """ + Test that a QueryFunction object can be constructed with no specified + value. + """ + self._test_init(None) + + def test_init_with_valid(self): + """ + Test that a QueryFunction object can be constructed with a valid + QueryFunction enumeration value. + """ + self._test_init(QueryFunctionEnum.QUERY_OBJECTS) + + def test_init_with_invalid(self): + """ + Test that a TypeError exception is raised when a non QueryFunction + enumeration value is used to construct a QueryFunction object. + """ + self._test_init("invalid") + + +class TestVendorIdentification(TestCase): + """ + A test suite for the VendorIdentification class. + + Since VendorIdentification is a simple wrapper for the TextString + primitive, only a few tests pertaining to construction are needed. + """ + + def setUp(self): + super(TestVendorIdentification, self).setUp() + + def tearDown(self): + super(TestVendorIdentification, self).tearDown() + + def _test_init(self, value): + if (isinstance(value, string_types)) or (value is None): + vendor_identification = VendorIdentification(value) + + if value is None: + value = '' + + msg = "expected {0}, observed {1}".format( + value, vendor_identification.value) + self.assertEqual(value, vendor_identification.value, msg) + else: + self.assertRaises(TypeError, VendorIdentification, value) + + def test_init_with_none(self): + """ + Test that a VendorIdentification object can be constructed with no + specified value. + """ + self._test_init(None) + + def test_init_with_valid(self): + """ + Test that a VendorIdentification object can be constructed with a + valid, string-type value. + """ + self._test_init("valid") + + def test_init_with_invalid(self): + """ + Test that a TypeError exception is raised when a non-string value is + used to construct a VendorIdentification object. + """ + self._test_init(0) diff --git a/kmip/tests/core/misc/test_server_information.py b/kmip/tests/core/misc/test_server_information.py new file mode 100644 index 0000000..863cc2b --- /dev/null +++ b/kmip/tests/core/misc/test_server_information.py @@ -0,0 +1,252 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from six import string_types +from testtools import TestCase + +from kmip.core.misc import ServerInformation +from kmip.core.utils import BytearrayStream + + +class TestServerInformation(TestCase): + """ + A test suite for the ServerInformation class. + """ + + def setUp(self): + super(TestServerInformation, self).setUp() + + self.data = BytearrayStream(b'\x00\x01\x02\x03') + + self.encoding_a = BytearrayStream( + b'\x42\x00\x88\x01\x00\x00\x00\x00') + self.encoding_b = BytearrayStream( + b'\x42\x00\x88\x01\x00\x00\x00\x04\x00\x01\x02\x03') + + def tearDown(self): + super(TestServerInformation, self).tearDown() + + def test_init(self): + ServerInformation() + + def _test_read(self, stream, data): + server_information = ServerInformation() + server_information.read(stream) + + expected = data + observed = server_information.data + + msg = "data decoding mismatch" + msg += "; expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_read_with_none(self): + """ + Test that a ServerInformation object with no data can be read from a + data stream. + """ + self._test_read(self.encoding_a, BytearrayStream()) + + def test_read_with_data(self): + """ + Test that a ServerInformation object with data can be read from a + data stream. + """ + self._test_read(self.encoding_b, self.data) + + def _test_write(self, stream_expected, data): + stream_observed = BytearrayStream() + server_information = ServerInformation() + + if data is not None: + server_information.data = data + + server_information.write(stream_observed) + + length_expected = len(stream_expected) + length_observed = len(stream_observed) + + msg = "encoding lengths not equal" + msg += "; expected {0}, observed {1}".format( + length_expected, length_observed) + self.assertEqual(length_expected, length_observed, msg) + + msg = "encoding mismatch" + msg += ";\nexpected:\n{0}\nobserved:\n{1}".format( + stream_expected, stream_observed) + self.assertEqual(stream_expected, stream_observed, msg) + + def test_write_with_none(self): + """ + Test that a ServerInformation object with no data can be written to a + data stream. + """ + self._test_write(self.encoding_a, None) + + def test_write_with_data(self): + """ + Test that a ServerInformation object with data can be written to a + data stream. + """ + self._test_write(self.encoding_b, self.data) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + ServerInformation objects with the same internal data. + """ + a = ServerInformation() + b = ServerInformation() + + a.data = self.data + b.data = self.data + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_equal_and_empty(self): + """ + Test that the equality operator returns True when comparing two + ServerInformation objects with no internal data. + """ + a = ServerInformation() + b = ServerInformation() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal(self): + """ + Test that the equality operator returns False when comparing two + ServerInformation objects with different sets of internal data. + """ + a = ServerInformation() + b = ServerInformation() + + a.data = self.data + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing a + ServerInformation object to a non-ServerInformation object. + """ + a = ServerInformation() + b = "invalid" + + self.assertFalse(a == b) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing + two ServerInformation objects with the same internal data. + """ + a = ServerInformation() + b = ServerInformation() + + a.data = self.data + b.data = self.data + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_equal_and_empty(self): + """ + Test that the inequality operator returns False when comparing + two ServerInformation objects with no internal data. + """ + a = ServerInformation() + b = ServerInformation() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal(self): + """ + Test that the inequality operator returns True when comparing two + ServerInformation objects with different sets of internal data. + """ + a = ServerInformation() + b = ServerInformation() + + a.data = self.data + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing a + ServerInformation object to a non-ServerInformation object. + """ + a = ServerInformation() + b = "invalid" + + self.assertTrue(a != b) + + def test_repr(self): + """ + Test that the representation of a ServerInformation object is + formatted properly and can be used by eval to create a new + ServerInformation object. + """ + server_information = ServerInformation() + + expected = "ServerInformation()" + observed = repr(server_information) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + expected = server_information + observed = eval(observed) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def _test_str(self, data): + server_information = ServerInformation() + server_information.data = data + str_repr = str(server_information) + + expected = len(str(data)) + observed = len(str_repr) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + # TODO (peter-hamilton) This should be binary_type. Fix involves + # TODO (peter-hamilton) refining BytearrayStream implementation. + expected = string_types + observed = str_repr + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertIsInstance(observed, expected, msg) + + def test_str_with_no_data(self): + """ + Test that the string representation of a ServerInformation object + is formatted properly when there is no internal data. + """ + self._test_str(BytearrayStream()) + + def test_str_with_data(self): + """ + Test that the string representation of a ServerInformation object + is formatted properly when there is internal data. + """ + self._test_str(self.data) diff --git a/kmip/tests/core/objects/__init__.py b/kmip/tests/core/objects/__init__.py new file mode 100644 index 0000000..417e2f9 --- /dev/null +++ b/kmip/tests/core/objects/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/kmip/tests/core/objects/test_extension_information.py b/kmip/tests/core/objects/test_extension_information.py new file mode 100644 index 0000000..e3fa7d9 --- /dev/null +++ b/kmip/tests/core/objects/test_extension_information.py @@ -0,0 +1,444 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from testtools import TestCase + +from kmip.core.objects import ExtensionInformation +from kmip.core.objects import ExtensionName +from kmip.core.objects import ExtensionTag +from kmip.core.objects import ExtensionType + +from kmip.core.utils import BytearrayStream + + +class TestExtensionInformation(TestCase): + """ + A test suite for the ExtensionInformation class. + + Test encodings obtained from Section 12.2 of the KMIP 1.1 Test Cases + documentation. + """ + + def setUp(self): + super(TestExtensionInformation, self).setUp() + + self.extension_name_b = ExtensionName('ACME LOCATION') + self.extension_name_c = ExtensionName('ACME LOCATION') + self.extension_name_d = ExtensionName('ACME ZIP CODE') + + self.extension_tag_c = ExtensionTag(5548545) + self.extension_tag_d = ExtensionTag(5548546) + + self.extension_type_c = ExtensionType(7) + self.extension_type_d = ExtensionType(2) + + self.encoding_a = BytearrayStream( + b'\x42\x00\xA4\x01\x00\x00\x00\x08\x42\x00\xA5\x07\x00\x00\x00' + b'\x00') + self.encoding_b = BytearrayStream( + b'\x42\x00\xA4\x01\x00\x00\x00\x18\x42\x00\xA5\x07\x00\x00\x00\x0D' + b'\x41\x43\x4D\x45\x20\x4C\x4F\x43\x41\x54\x49\x4F\x4E\x00\x00' + b'\x00') + self.encoding_c = BytearrayStream( + b'\x42\x00\xA4\x01\x00\x00\x00\x38\x42\x00\xA5\x07\x00\x00\x00\x0D' + b'\x41\x43\x4D\x45\x20\x4C\x4F\x43\x41\x54\x49\x4F\x4E\x00\x00\x00' + b'\x42\x00\xA6\x02\x00\x00\x00\x04\x00\x54\xAA\x01\x00\x00\x00\x00' + b'\x42\x00\xA7\x02\x00\x00\x00\x04\x00\x00\x00\x07\x00\x00\x00' + b'\x00') + self.encoding_d = BytearrayStream( + b'\x42\x00\xA4\x01\x00\x00\x00\x38\x42\x00\xA5\x07\x00\x00\x00\x0D' + b'\x41\x43\x4D\x45\x20\x5A\x49\x50\x20\x43\x4F\x44\x45\x00\x00\x00' + b'\x42\x00\xA6\x02\x00\x00\x00\x04\x00\x54\xAA\x02\x00\x00\x00\x00' + b'\x42\x00\xA7\x02\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00' + b'\x00') + + def tearDown(self): + super(TestExtensionInformation, self).tearDown() + + def _test_init(self): + pass + + def test_init_with_none(self): + ExtensionInformation() + + def test_init_with_args(self): + ExtensionInformation( + extension_name=ExtensionName(), + extension_tag=ExtensionTag(), + extension_type=ExtensionType()) + + def test_validate_with_invalid_extension_name(self): + """ + Test that a TypeError exception is raised when an invalid + ExtensionName is used to construct an ExtensionInformation object. + """ + kwargs = {'extension_name': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid extension name", + ExtensionInformation, **kwargs) + + def test_validate_with_invalid_extension_tag(self): + """ + Test that a TypeError exception is raised when an invalid + ExtensionTag is used to construct an ExtensionInformation object. + """ + kwargs = {'extension_tag': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid extension tag", + ExtensionInformation, **kwargs) + + def test_validate_with_invalid_extension_type(self): + """ + Test that a TypeError exception is raised when an invalid + ExtensionType is used to construct an ExtensionInformation object. + """ + kwargs = {'extension_type': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid extension type", + ExtensionInformation, **kwargs) + + def _test_read(self, stream, extension_name, extension_tag, + extension_type): + extension_information = ExtensionInformation() + extension_information.read(stream) + + if extension_name is None: + extension_name = ExtensionName() + + msg = "extension name encoding mismatch" + msg += "; expected {0}, observed {1}".format( + extension_name, + extension_information.extension_name) + self.assertEqual( + extension_name, + extension_information.extension_name, msg) + + msg = "extension tag encoding mismatch" + msg += "; expected {0}, observed {1}".format( + extension_tag, + extension_information.extension_tag) + self.assertEqual( + extension_tag, + extension_information.extension_tag, msg) + + msg = "extension type encoding mismatch" + msg += "; expected {0}, observed {1}".format( + extension_type, + extension_information.extension_type) + self.assertEqual( + extension_type, + extension_information.extension_type, msg) + + def test_read_with_none(self): + """ + Test that an ExtensionInformation object with no data can be read from + a data stream. + """ + self._test_read(self.encoding_a, None, None, None) + + def test_read_with_partial_args(self): + """ + Test that an ExtensionInformation object with some data can be read + from a data stream. + """ + self._test_read(self.encoding_b, self.extension_name_b, None, None) + + def test_read_with_multiple_args(self): + """ + Test that an ExtensionInformation object with data can be read from a + data stream. + """ + self._test_read(self.encoding_c, self.extension_name_c, + self.extension_tag_c, self.extension_type_c) + + def _test_write(self, stream_expected, extension_name, extension_tag, + extension_type): + stream_observed = BytearrayStream() + extension_information = ExtensionInformation( + extension_name=extension_name, + extension_tag=extension_tag, + extension_type=extension_type) + extension_information.write(stream_observed) + + length_expected = len(stream_expected) + length_observed = len(stream_observed) + + msg = "encoding lengths not equal" + msg += "; expected {0}, observed {1}".format( + length_expected, length_observed) + self.assertEqual(length_expected, length_observed, msg) + + msg = "encoding mismatch" + msg += ";\nexpected:\n{0}\nobserved:\n{1}".format( + stream_expected, stream_observed) + self.assertEqual(stream_expected, stream_observed, msg) + + def test_write_with_none(self): + """ + Test that an ExtensionInformation object with no data can be written + to a data stream. + """ + self._test_write(self.encoding_a, None, None, None) + + def test_write_with_partial_args(self): + """ + Test that an ExtensionInformation object with some data can be written + to a data stream. + """ + self._test_write(self.encoding_b, self.extension_name_b, None, None) + + def test_write_with_multiple_args(self): + """ + Test that an ExtensionInformation object with data can be written to + a data stream. + """ + self._test_write(self.encoding_c, self.extension_name_c, + self.extension_tag_c, self.extension_type_c) + + def _test_create(self, extension_name, extension_tag, extension_type): + extension_information = ExtensionInformation.create( + extension_name=extension_name, + extension_tag=extension_tag, + extension_type=extension_type) + + self.assertIsInstance(extension_information, ExtensionInformation) + + expected = ExtensionName(extension_name) + observed = extension_information.extension_name + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + expected = ExtensionTag(extension_tag) + observed = extension_information.extension_tag + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + expected = ExtensionType(extension_type) + observed = extension_information.extension_type + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_create_with_none(self): + """ + Test that an ExtensionInformation object with no data can be created + using the create class method. + """ + self._test_create(None, None, None) + + def test_create_with_args(self): + """ + Test that an ExtensionInformation object with data can be created + using the create class method. + """ + self._test_create('ACME LOCATION', 5548545, 7) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + ExtensionInformation objects with the same internal data. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_equal_and_empty(self): + """ + Test that the equality operator returns True when comparing two + ExtensionInformation objects with no internal data. + """ + a = ExtensionInformation() + b = ExtensionInformation() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal(self): + """ + Test that the equality operator returns False when comparing two + ExtensionInformation objects with different sets of internal data. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = ExtensionInformation() + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing an + ExtensionInformation object with a non-ExtensionInformation object. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + ExtensionInformation objects with the same internal data. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_equal_and_empty(self): + """ + Test that the inequality operator returns False when comparing two + ExtensionInformation objects with no internal data. + """ + a = ExtensionInformation() + b = ExtensionInformation() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal(self): + """ + Test that the inequality operator returns True when comparing two + ExtensionInformation objects with the different sets of internal data. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = ExtensionInformation() + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing an + ExtensionInformation object with a non-ExtensionInformation object. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = "invalid" + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr_with_no_data(self): + """ + Test that the representation of an ExtensionInformation object with no + data is formatted properly and can be used by eval to create a new + ExtensionInformation object identical to the original. + """ + extension_information = ExtensionInformation() + + expected = "ExtensionInformation(" + expected += "extension_name=ExtensionName(value=''), " + expected += "extension_tag=None, " + expected += "extension_type=None)" + observed = repr(extension_information) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + expected = extension_information + observed = eval(repr(extension_information)) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_repr_with_data(self): + """ + Test that the representation of an ExtensionInformation object with + data is formatted properly and can be used by eval to create a new + ExtensionInformation object identical to the original. + """ + extension_information = ExtensionInformation( + extension_name=ExtensionName('ACME LOCATION'), + extension_tag=ExtensionTag(5548545), + extension_type=ExtensionType(7)) + + expected = "ExtensionInformation(" + expected += "extension_name=ExtensionName(value='ACME LOCATION'), " + expected += "extension_tag=ExtensionTag(value=5548545), " + expected += "extension_type=ExtensionType(value=7))" + observed = repr(extension_information) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + expected = extension_information + observed = eval(repr(extension_information)) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_str_with_no_data(self): + """ + Test that the string representation of an ExtensionInformation object + is formatted properly when there is no internal data. + """ + extension_information = ExtensionInformation() + + expected = "ExtensionInformation(" + expected += "extension_name=ExtensionName(value=''), " + expected += "extension_tag=None, " + expected += "extension_type=None)" + observed = str(extension_information) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_str_with_data(self): + """ + Test that the string representation of an ExtensionInformation object + is formatted properly when there is internal data. + """ + extension_information = ExtensionInformation( + extension_name=ExtensionName('ACME LOCATION'), + extension_tag=ExtensionTag(5548545), + extension_type=ExtensionType(7)) + + expected = "ExtensionInformation(" + expected += "extension_name=ExtensionName(value='ACME LOCATION'), " + expected += "extension_tag=ExtensionTag(value=5548545), " + expected += "extension_type=ExtensionType(value=7))" + observed = str(extension_information) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) diff --git a/kmip/tests/core/objects/test_objects.py b/kmip/tests/core/objects/test_objects.py new file mode 100644 index 0000000..63f6ae6 --- /dev/null +++ b/kmip/tests/core/objects/test_objects.py @@ -0,0 +1,168 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from six import string_types +from testtools import TestCase + +from kmip.core.objects import ExtensionName +from kmip.core.objects import ExtensionTag +from kmip.core.objects import ExtensionType + + +class TestExtensionName(TestCase): + """ + A test suite for the ExtensionName class. + + Since ExtensionName is a simple wrapper for the TextString primitive, only + a few tests pertaining to construction are needed. + """ + + def setUp(self): + super(TestExtensionName, self).setUp() + + def tearDown(self): + super(TestExtensionName, self).tearDown() + + def _test_init(self, value): + if (isinstance(value, string_types)) or (value is None): + extension_name = ExtensionName(value) + + if value is None: + value = '' + + msg = "expected {0}, observed {1}".format( + value, extension_name.value) + self.assertEqual(value, extension_name.value, msg) + else: + self.assertRaises(TypeError, ExtensionName, value) + + def test_init_with_none(self): + """ + Test that an ExtensionName object can be constructed with no specified + value. + """ + self._test_init(None) + + def test_init_with_valid(self): + """ + Test that an ExtensionName object can be constructed with a valid + string value. + """ + self._test_init("valid") + + def test_init_with_invalid(self): + """ + Test that a TypeError exception is raised when a non-string value is + used to construct an ExtensionName object. + """ + self._test_init(0) + + +class TestExtensionTag(TestCase): + """ + A test suite for the ExtensionTag class. + + Since ExtensionTag is a simple wrapper for the Integer primitive, only a + few tests pertaining to construction are needed. + """ + + def setUp(self): + super(TestExtensionTag, self).setUp() + + def tearDown(self): + super(TestExtensionTag, self).tearDown() + + def _test_init(self, value): + if (isinstance(value, int)) or (value is None): + extension_tag = ExtensionTag(value) + + if value is None: + value = 0 + + msg = "expected {0}, observed {1}".format( + value, extension_tag.value) + self.assertEqual(value, extension_tag.value, msg) + else: + self.assertRaises(TypeError, ExtensionTag, value) + + def test_init_with_none(self): + """ + Test that an ExtensionTag object can be constructed with no specified + value. + """ + self._test_init(None) + + def test_init_with_valid(self): + """ + Test that an ExtensionTag object can be constructed with a valid + integer value. + """ + self._test_init(0) + + def test_init_with_invalid(self): + """ + Test that a TypeError exception is raised when a non-integer value is + used to construct an ExtensionName object. + """ + self._test_init("invalid") + + +class TestExtensionType(TestCase): + """ + A test suite for the ExtensionType class. + + Since ExtensionType is a simple wrapper for the Integer primitive, only a + few tests pertaining to construction are needed. + """ + + def setUp(self): + super(TestExtensionType, self).setUp() + + def tearDown(self): + super(TestExtensionType, self).tearDown() + + def _test_init(self, value): + if (isinstance(value, int)) or (value is None): + extension_type = ExtensionType(value) + + if value is None: + value = 0 + + msg = "expected {0}, observed {1}".format( + value, extension_type.value) + self.assertEqual(value, extension_type.value, msg) + else: + self.assertRaises(TypeError, ExtensionType, value) + + def test_init_with_none(self): + """ + Test that an ExtensionType object can be constructed with no specified + value. + """ + self._test_init(None) + + def test_init_with_valid(self): + """ + Test that an ExtensionType object can be constructed with a valid + integer value. + """ + self._test_init(0) + + def test_init_with_invalid(self): + """ + Test that a TypeError exception is raised when a non-string value is + used to construct an ExtensionType object. + """ + self._test_init("invalid") diff --git a/kmip/tests/services/test_kmip_client.py b/kmip/tests/services/test_kmip_client.py index 1b9880c..bfae85c 100644 --- a/kmip/tests/services/test_kmip_client.py +++ b/kmip/tests/services/test_kmip_client.py @@ -30,6 +30,7 @@ from kmip.core.enums import CryptographicUsageMask from kmip.core.enums import ObjectType from kmip.core.enums import Operation as OperationEnum from kmip.core.enums import KeyFormatType +from kmip.core.enums import QueryFunction as QueryFunctionEnum from kmip.core.enums import ResultStatus from kmip.core.enums import ResultReason @@ -51,10 +52,15 @@ from kmip.core.messages.payloads.create_key_pair import \ CreateKeyPairRequestPayload, CreateKeyPairResponsePayload from kmip.core.messages.payloads.discover_versions import \ DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload +from kmip.core.messages.payloads.query import \ + QueryRequestPayload, QueryResponsePayload from kmip.core.messages.payloads.rekey_key_pair import \ RekeyKeyPairRequestPayload, RekeyKeyPairResponsePayload from kmip.core.misc import Offset +from kmip.core.misc import QueryFunction +from kmip.core.misc import ServerInformation +from kmip.core.misc import VendorIdentification from kmip.core.objects import Attribute from kmip.core.objects import CommonTemplateAttribute @@ -68,6 +74,7 @@ from kmip.services.kmip_client import KMIPProxy from kmip.services.results import CreateKeyPairResult from kmip.services.results import DiscoverVersionsResult +from kmip.services.results import QueryResult from kmip.services.results import RekeyKeyPairResult import kmip.core.utils as utils @@ -594,6 +601,41 @@ class TestKMIPClient(TestCase): self._test_build_rekey_key_pair_batch_item( None, None, None, None, None) + def _test_build_query_batch_item(self, query_functions): + batch_item = self.client._build_query_batch_item(query_functions) + + base = "expected {0}, received {1}" + msg = base.format(RequestBatchItem, batch_item) + self.assertIsInstance(batch_item, RequestBatchItem, msg) + + operation = batch_item.operation + + msg = base.format(Operation, operation) + self.assertIsInstance(operation, Operation, msg) + + operation_enum = operation.enum + + msg = base.format(OperationEnum.QUERY, operation_enum) + self.assertEqual(OperationEnum.QUERY, operation_enum, msg) + + payload = batch_item.request_payload + + if query_functions is None: + query_functions = list() + + msg = base.format(QueryRequestPayload, payload) + self.assertIsInstance(payload, QueryRequestPayload, msg) + + query_functions_observed = payload.query_functions + self.assertEqual(query_functions, query_functions_observed) + + def test_build_query_batch_item_with_input(self): + self._test_build_query_batch_item( + [QueryFunction(QueryFunctionEnum.QUERY_OBJECTS)]) + + def test_build_query_batch_item_without_input(self): + self._test_build_query_batch_item(None) + def _test_build_discover_versions_batch_item(self, protocol_versions): batch_item = self.client._build_discover_versions_batch_item( protocol_versions) @@ -680,6 +722,10 @@ class TestKMIPClient(TestCase): self.assertRaisesRegexp(ValueError, "no processor for operation", self.client._get_batch_item_processor, None) + def _test_equality(self, expected, observed): + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + def test_process_create_key_pair_batch_item(self): batch_item = ResponseBatchItem( operation=Operation(OperationEnum.CREATE_KEY_PAIR), @@ -698,6 +744,64 @@ class TestKMIPClient(TestCase): msg = "expected {0}, received {1}".format(RekeyKeyPairResult, result) self.assertIsInstance(result, RekeyKeyPairResult, msg) + def _test_process_query_batch_item( + self, + operations, + object_types, + vendor_identification, + server_information, + application_namespaces, + extension_information): + + payload = QueryResponsePayload( + operations, + object_types, + vendor_identification, + server_information, + application_namespaces, + extension_information) + batch_item = ResponseBatchItem( + operation=Operation(OperationEnum.QUERY), + response_payload=payload) + + result = self.client._process_query_batch_item(batch_item) + + base = "expected {0}, observed {1}" + msg = base.format(QueryResult, result) + self.assertIsInstance(result, QueryResult, msg) + + # The payload maps the following inputs to empty lists on None. + if operations is None: + operations = list() + if object_types is None: + object_types = list() + if application_namespaces is None: + application_namespaces = list() + if extension_information is None: + extension_information = list() + + self._test_equality(operations, result.operations) + self._test_equality(object_types, result.object_types) + self._test_equality( + vendor_identification, result.vendor_identification) + self._test_equality(server_information, result.server_information) + self._test_equality( + application_namespaces, result.application_namespaces) + self._test_equality( + extension_information, result.extension_information) + + def test_process_query_batch_item_with_results(self): + self._test_process_query_batch_item( + list(), + list(), + VendorIdentification(), + ServerInformation(), + list(), + list()) + + def test_process_query_batch_item_without_results(self): + self._test_process_query_batch_item(None, None, None, None, None, None) + def _test_process_discover_versions_batch_item(self, protocol_versions): batch_item = ResponseBatchItem( operation=Operation(OperationEnum.DISCOVER_VERSIONS),