From 80ee64e600c7c9b63d141ac9a585248bb5995b34 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Mon, 23 Feb 2015 17:18:05 -0500 Subject: [PATCH] Adding support for the Query operation This change adds support for the Query operation, including updates to the KMIP client and core object libraries, the KMIP client and core unit test suites, and a Query unit demo. --- kmip/core/enums.py | 9 + kmip/core/factories/payloads/request.py | 4 + kmip/core/factories/payloads/response.py | 4 + kmip/core/messages/payloads/query.py | 339 ++++++++++ kmip/core/misc.py | 171 ++++- kmip/core/objects.py | 241 +++++++ kmip/core/primitives.py | 19 +- kmip/core/utils.py | 20 +- kmip/demos/units/query.py | 110 ++++ kmip/demos/utils.py | 2 + kmip/services/kmip_client.py | 65 ++ kmip/services/results.py | 54 ++ .../core/factories/payloads/test_request.py | 5 +- .../core/factories/payloads/test_response.py | 5 +- .../core/messages/payloads/test_query.py | 587 ++++++++++++++++++ kmip/tests/core/misc/__init__.py | 14 + kmip/tests/core/misc/test_misc.py | 117 ++++ .../core/misc/test_server_information.py | 252 ++++++++ kmip/tests/core/objects/__init__.py | 14 + .../objects/test_extension_information.py | 444 +++++++++++++ kmip/tests/core/objects/test_objects.py | 168 +++++ kmip/tests/services/test_kmip_client.py | 104 ++++ 22 files changed, 2731 insertions(+), 17 deletions(-) create mode 100644 kmip/core/messages/payloads/query.py create mode 100644 kmip/demos/units/query.py create mode 100644 kmip/tests/core/messages/payloads/test_query.py create mode 100644 kmip/tests/core/misc/__init__.py create mode 100644 kmip/tests/core/misc/test_misc.py create mode 100644 kmip/tests/core/misc/test_server_information.py create mode 100644 kmip/tests/core/objects/__init__.py create mode 100644 kmip/tests/core/objects/test_extension_information.py create mode 100644 kmip/tests/core/objects/test_objects.py diff --git a/kmip/core/enums.py b/kmip/core/enums.py index c481f65..f01b069 100644 --- a/kmip/core/enums.py +++ b/kmip/core/enums.py @@ -458,6 +458,15 @@ class KeyRoleType(Enum): PVKOTH = 0x00000015 +# 9.1.3.2.24 +class QueryFunction(Enum): + QUERY_OPERATIONS = 0x00000001 + QUERY_OBJECTS = 0x00000002 + QUERY_SERVER_INFORMATION = 0x00000003 + QUERY_APPLICATION_NAMESPACES = 0x00000004 + QUERY_EXTENSION_LIST = 0x00000005 + QUERY_EXTENSION_MAP = 0x00000006 + # 9.1.3.2.27 class Operation(Enum): CREATE = 0x00000001 diff --git a/kmip/core/factories/payloads/request.py b/kmip/core/factories/payloads/request.py index adc9958..41e5b6c 100644 --- a/kmip/core/factories/payloads/request.py +++ b/kmip/core/factories/payloads/request.py @@ -21,6 +21,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -48,5 +49,8 @@ class RequestPayloadFactory(PayloadFactory): def _create_destroy_payload(self): return destroy.DestroyRequestPayload() + def _create_query_payload(self): + return query.QueryRequestPayload() + def _create_discover_versions_payload(self): return discover_versions.DiscoverVersionsRequestPayload() diff --git a/kmip/core/factories/payloads/response.py b/kmip/core/factories/payloads/response.py index 69a7c31..88e0a49 100644 --- a/kmip/core/factories/payloads/response.py +++ b/kmip/core/factories/payloads/response.py @@ -21,6 +21,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -48,5 +49,8 @@ class ResponsePayloadFactory(PayloadFactory): def _create_destroy_payload(self): return destroy.DestroyResponsePayload() + def _create_query_payload(self): + return query.QueryResponsePayload() + def _create_discover_versions_payload(self): return discover_versions.DiscoverVersionsResponsePayload() diff --git a/kmip/core/messages/payloads/query.py b/kmip/core/messages/payloads/query.py new file mode 100644 index 0000000..248111d --- /dev/null +++ b/kmip/core/messages/payloads/query.py @@ -0,0 +1,339 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from six.moves import xrange + +from kmip.core.attributes import ApplicationNamespace +from kmip.core.attributes import ObjectType + +from kmip.core.enums import Tags +from kmip.core.messages.contents import Operation + +from kmip.core.misc import QueryFunction +from kmip.core.misc import ServerInformation +from kmip.core.misc import VendorIdentification + +from kmip.core.objects import ExtensionInformation +from kmip.core.primitives import Struct +from kmip.core.utils import BytearrayStream + + +class QueryRequestPayload(Struct): + """ + A request payload for the Query operation. + + The payload contains a list of query functions that the KMIP server should + respond to. See Section 4.25 of the KMIP 1.1 specification for more + information. + + Attributes: + query_functions: A list of QueryFunction enumerations. + """ + def __init__(self, query_functions=None): + """ + Construct a QueryRequestPayload object. + + Args: + query_functions (list): A list of QueryFunction enumerations. + """ + super(QueryRequestPayload, self).__init__(Tags.REQUEST_PAYLOAD) + + if query_functions is None: + self.query_functions = list() + else: + self.query_functions = query_functions + + self.validate() + + def read(self, istream): + """ + Read the data encoding the QueryRequestPayload object and decode it + into its constituent parts. + + Args: + istream (Stream): A data stream containing encoded object data, + supporting a read method; usually a BytearrayStream object. + """ + super(QueryRequestPayload, self).read(istream) + tstream = BytearrayStream(istream.read(self.length)) + + while(self.is_tag_next(Tags.QUERY_FUNCTION, tstream)): + query_function = QueryFunction() + query_function.read(tstream) + self.query_functions.append(query_function) + + self.is_oversized(tstream) + self.validate() + + def write(self, ostream): + """ + Write the data encoding the QueryRequestPayload object to a stream. + + Args: + ostream (Stream): A data stream in which to encode object data, + supporting a write method; usually a BytearrayStream object. + """ + tstream = BytearrayStream() + + for query_function in self.query_functions: + query_function.write(tstream) + + self.length = tstream.length() + super(QueryRequestPayload, self).write(ostream) + ostream.write(tstream.buffer) + + def validate(self): + """ + Error check the attributes of the QueryRequestPayload object. + """ + self.__validate() + + def __validate(self): + if isinstance(self.query_functions, list): + for i in xrange(len(self.query_functions)): + query_function = self.query_functions[i] + if not isinstance(query_function, QueryFunction): + msg = "invalid query function ({0} in list)".format(i) + msg += "; expected {0}, received {1}".format( + QueryFunction, query_function) + raise TypeError(msg) + else: + msg = "invalid query functions list" + msg += "; expected {0}, received {1}".format( + list, self.query_functions) + raise TypeError(msg) + + +class QueryResponsePayload(Struct): + """ + A response payload for the Query operation. + + The payload contains different sets of responses that the KMIP server + provides in response to the initial Query request. See Section 4.25 of the + KMIP 1.1 specification for more information. + + Attributes: + operations: A list of Operations supported by the server. + object_types: A list of ObjectTypes supported by the server. + vendor_identification: A string identifying the server vendor. + server_information: A structure containing vendor-specific fields and + substructures. + application_namespaces: A list of application namespaces supported by + the server. + extension_information: A list of ExtensionInformation objects detailing + Objects supported by the server with ItemTag values in the + Extensions range. + """ + def __init__(self, operations=None, object_types=None, + vendor_identification=None, server_information=None, + application_namespaces=None, extension_information=None): + """ + Construct a QueryResponsePayload object. + + Args: + operations (list): A list of Operations supported by the server. + object_types (list): A list of ObjectTypes supported by the server. + vendor_identification (VendorIdentification): A string identifying + the server vendor. + server_information (ServerInformation): A structure containing + vendor-specific fields and substructures. + application_namespaces (list): A list of application namespaces + supported by the server. + extension_information (list): A list of ExtensionInformation + objects detailing Objects supported by the server with ItemTag + values in the Extensions range. + """ + super(QueryResponsePayload, self).__init__(Tags.RESPONSE_PAYLOAD) + + if operations is None: + self.operations = list() + else: + self.operations = operations + + if object_types is None: + self.object_types = list() + else: + self.object_types = object_types + + self.vendor_identification = vendor_identification + self.server_information = server_information + + if application_namespaces is None: + self.application_namespaces = list() + else: + self.application_namespaces = application_namespaces + + if extension_information is None: + self.extension_information = list() + else: + self.extension_information = extension_information + + self.validate() + + def read(self, istream): + """ + Read the data encoding the QueryResponsePayload object and decode it + into its constituent parts. + + Args: + istream (Stream): A data stream containing encoded object data, + supporting a read method; usually a BytearrayStream object. + """ + super(QueryResponsePayload, self).read(istream) + tstream = BytearrayStream(istream.read(self.length)) + + while(self.is_tag_next(Tags.OPERATION, tstream)): + operation = Operation() + operation.read(tstream) + self.operations.append(operation) + + while(self.is_tag_next(Tags.OBJECT_TYPE, tstream)): + object_type = ObjectType() + object_type.read(tstream) + self.object_types.append(object_type) + + if self.is_tag_next(Tags.VENDOR_IDENTIFICATION, tstream): + self.vendor_identification = VendorIdentification() + self.vendor_identification.read(tstream) + + if self.is_tag_next(Tags.SERVER_INFORMATION, tstream): + self.server_information = ServerInformation() + self.server_information.read(tstream) + + while(self.is_tag_next(Tags.APPLICATION_NAMESPACE, tstream)): + application_namespace = ApplicationNamespace() + application_namespace.read(tstream) + self.application_namespaces.append(application_namespace) + + while(self.is_tag_next(Tags.EXTENSION_INFORMATION, tstream)): + extension_information = ExtensionInformation() + extension_information.read(tstream) + self.extension_information.append(extension_information) + + self.is_oversized(tstream) + self.validate() + + def write(self, ostream): + """ + Write the data encoding the QueryResponsePayload object to a stream. + + Args: + ostream (Stream): A data stream in which to encode object data, + supporting a write method; usually a BytearrayStream object. + """ + tstream = BytearrayStream() + + for operation in self.operations: + operation.write(tstream) + + for object_type in self.object_types: + object_type.write(tstream) + + if self.vendor_identification is not None: + self.vendor_identification.write(tstream) + + if self.server_information is not None: + self.server_information.write(tstream) + + for application_namespace in self.application_namespaces: + application_namespace.write(tstream) + + for extension_information in self.extension_information: + extension_information.write(tstream) + + self.length = tstream.length() + super(QueryResponsePayload, self).write(ostream) + ostream.write(tstream.buffer) + + def validate(self): + """ + Error check the attributes of the QueryRequestPayload object. + """ + self.__validate() + + def __validate(self): + # TODO (peter-hamilton) Add separate validate_list function for this + if isinstance(self.operations, list): + for i in xrange(len(self.operations)): + operation = self.operations[i] + if not isinstance(operation, Operation): + msg = "invalid operation ({0} in list)".format(i) + msg += "; expected {0}, received {1}".format( + Operation, operation) + raise TypeError(msg) + else: + msg = "invalid operations list" + msg += "; expected {0}, received {1}".format( + list, self.operations) + raise TypeError(msg) + + if isinstance(self.object_types, list): + for i in xrange(len(self.object_types)): + object_type = self.object_types[i] + if not isinstance(object_type, ObjectType): + msg = "invalid object type ({0} in list)".format(i) + msg += "; expected {0}, received {1}".format( + ObjectType, object_type) + raise TypeError(msg) + else: + msg = "invalid object types list" + msg += "; expected {0}, received {1}".format( + list, self.object_types) + raise TypeError(msg) + + if self.vendor_identification is not None: + if not isinstance(self.vendor_identification, + VendorIdentification): + msg = "invalid vendor identification" + msg += "; expected {0}, received {1}".format( + VendorIdentification, self.vendor_identification) + raise TypeError(msg) + + if self.server_information is not None: + if not isinstance(self.server_information, ServerInformation): + msg = "invalid server information" + msg += "; expected {0}, received {1}".format( + ServerInformation, self.server_information) + raise TypeError(msg) + + if isinstance(self.application_namespaces, list): + for i in xrange(len(self.application_namespaces)): + application_namespace = self.application_namespaces[i] + if not isinstance(application_namespace, ApplicationNamespace): + msg = "invalid application namespace ({0} in list)".format( + i) + msg += "; expected {0}, received {1}".format( + ApplicationNamespace, application_namespace) + raise TypeError(msg) + else: + msg = "invalid application namespaces list" + msg += "; expected {0}, received {1}".format( + list, self.application_namespaces) + raise TypeError(msg) + + if isinstance(self.extension_information, list): + for i in xrange(len(self.extension_information)): + extension_information = self.extension_information[i] + if not isinstance(extension_information, ExtensionInformation): + msg = "invalid extension information ({0} in list)".format( + i) + msg += "; expected {0}, received {1}".format( + ExtensionInformation, extension_information) + raise TypeError(msg) + else: + msg = "invalid extension information list" + msg += "; expected {0}, received {1}".format( + list, self.extension_information) + raise TypeError(msg) diff --git a/kmip/core/misc.py b/kmip/core/misc.py index 3014864..69cf936 100644 --- a/kmip/core/misc.py +++ b/kmip/core/misc.py @@ -13,11 +13,172 @@ # License for the specific language governing permissions and limitations # under the License. -from kmip.core import enums -from kmip.core import primitives +from kmip.core.enums import Tags +from kmip.core.enums import QueryFunction as QueryFunctionEnum + +from kmip.core.primitives import Enumeration +from kmip.core.primitives import Interval +from kmip.core.primitives import Struct +from kmip.core.primitives import TextString + +from kmip.core.utils import BytearrayStream -class Offset(primitives.Interval): +class Offset(Interval): + """ + An integer representing a positive change in time. - def __init__(self, value=None, tag=enums.Tags.OFFSET): - super(Offset, self).__init__(value, tag) + Used by Rekey and Recertify requests to indicate the time difference + between the InitializationDate and the ActivationDate of the replacement + item to be created. See Sections 4.4, 4.5, and 4.8 of the KMIP v1.1 + specification for more information. + """ + + def __init__(self, value=None): + """ + Construct an Offset object. + + Args: + value (int): An integer representing a positive change in time. + Optional, defaults to None. + """ + super(Offset, self).__init__(value, Tags.OFFSET) + + +class QueryFunction(Enumeration): + """ + An encodeable wrapper for the QueryFunction enumeration. + + Used by Query requests to specify the information to retrieve from the + KMIP server. See Sections 4.25 and 9.1.3.2.24 of the KMIP v1.1 + specification for more information. + """ + ENUM_TYPE = QueryFunctionEnum + + def __init__(self, value=None): + """ + Construct a QueryFunction object. + + Args: + value (QueryFunction enum): A QueryFunction enumeration value, + (e.g., QueryFunction.QUERY_OPERATIONS). Optional, default to + None. + """ + super(QueryFunction, self).__init__(value, Tags.QUERY_FUNCTION) + + +class VendorIdentification(TextString): + """ + A text string uniquely identifying a KMIP vendor. + + Returned by KMIP servers upon receipt of a Query request for server + information. See Section 4.25 of the KMIP v1.1. specification for more + information. + """ + + def __init__(self, value=None): + """ + Construct a VendorIdentification object. + + Args: + value (str): A string describing a KMIP vendor. Optional, defaults + to None. + """ + super(VendorIdentification, self).__init__( + value, Tags.VENDOR_IDENTIFICATION) + + +class ServerInformation(Struct): + """ + A structure containing vendor-specific fields and/or substructures. + + Returned by KMIP servers upon receipt of a Query request for server + information. See Section 4.25 of the KMIP v1.1 specification for more + information. + + Note: + There are no example structures nor data encodings in the KMIP + documentation of this object. Therefore this class handles encoding and + decoding its data in a generic way, using a BytearrayStream for primary + storage. The intent is for vendor-specific subclasses to decide how to + decode this data from the stream attribute. Likewise, these subclasses + must decide how to encode their data into the stream attribute. There + are no arguments to the constructor and therefore no means by which to + validate the object's contents. + """ + + def __init__(self): + """ + Construct a ServerInformation object. + """ + super(ServerInformation, self).__init__(Tags.SERVER_INFORMATION) + + self.data = BytearrayStream() + + self.validate() + + def read(self, istream): + """ + Read the data encoding the ServerInformation object and decode it into + its constituent parts. + + Args: + istream (Stream): A data stream containing encoded object data, + supporting a read method; usually a BytearrayStream object. + """ + super(ServerInformation, self).read(istream) + tstream = BytearrayStream(istream.read(self.length)) + + self.data = BytearrayStream(tstream.read()) + + self.is_oversized(tstream) + self.validate() + + def write(self, ostream): + """ + Write the data encoding the ServerInformation object to a stream. + + Args: + ostream (Stream): A data stream in which to encode object data, + supporting a write method; usually a BytearrayStream object. + """ + tstream = BytearrayStream() + tstream.write(self.data.buffer) + + self.length = tstream.length() + super(ServerInformation, self).write(ostream) + ostream.write(tstream.buffer) + + def validate(self): + """ + Error check the types of the different parts of the ServerInformation + object. + """ + self.__validate() + + def __validate(self): + # NOTE (peter-hamilton): Intentional pass, no way to validate data. + pass + + def __eq__(self, other): + if isinstance(other, ServerInformation): + if len(self.data) != len(other.data): + return False + elif self.data != other.data: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, ServerInformation): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + return "ServerInformation()" + + def __str__(self): + return str(self.data) diff --git a/kmip/core/objects.py b/kmip/core/objects.py index 5d270c6..597cdaf 100644 --- a/kmip/core/objects.py +++ b/kmip/core/objects.py @@ -921,3 +921,244 @@ class PublicKeyTemplateAttribute(TemplateAttribute): attributes=None): super(PublicKeyTemplateAttribute, self).__init__( names, attributes, Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE) + + +# 2.1.9 +class ExtensionName(TextString): + """ + The name of an extended Object. + + A part of ExtensionInformation, specifically identifying an Object that is + a custom vendor addition to the KMIP specification. See Section 2.1.9 of + the KMIP 1.1 specification for more information. + + Attributes: + value: The string data representing the extension name. + """ + def __init__(self, value=''): + """ + Construct an ExtensionName object. + + Args: + value (str): The string data representing the extension name. + Optional, defaults to the empty string. + """ + super(ExtensionName, self).__init__(value, Tags.EXTENSION_NAME) + + +class ExtensionTag(Integer): + """ + The tag of an extended Object. + + A part of ExtensionInformation. See Section 2.1.9 of the KMIP 1.1 + specification for more information. + + Attributes: + value: The tag number identifying the extended object. + """ + def __init__(self, value=0): + """ + Construct an ExtensionTag object. + + Args: + value (int): A number representing the extension tag. Often + displayed in hex format. Optional, defaults to 0. + """ + super(ExtensionTag, self).__init__(value, Tags.EXTENSION_TAG) + + +class ExtensionType(Integer): + """ + The type of an extended Object. + + A part of ExtensionInformation, specifically identifying the type of the + Object in the specification extension. See Section 2.1.9 of the KMIP 1.1 + specification for more information. + + Attributes: + value: The type enumeration for the extended object. + """ + def __init__(self, value=None): + """ + Construct an ExtensionType object. + + Args: + value (Types): A number representing a Types enumeration value, + indicating the type of the extended Object. Optional, defaults + to None. + """ + super(ExtensionType, self).__init__(value, Tags.EXTENSION_TYPE) + + +class ExtensionInformation(Struct): + """ + A structure describing Objects defined in KMIP specification extensions. + + It is used specifically for Objects with Item Tag values in the Extensions + range and appears in responses to Query requests for server extension + information. See Sections 2.1.9 and 4.25 of the KMIP 1.1 specification for + more information. + + Attributes: + extension_name: The name of the extended Object. + extension_tag: The tag of the extended Object. + extension_type: The type of the extended Object. + """ + def __init__(self, extension_name=None, extension_tag=None, + extension_type=None): + """ + Construct an ExtensionInformation object. + + Args: + extension_name (ExtensionName): The name of the extended Object. + extension_tag (ExtensionTag): The tag of the extended Object. + extension_type (ExtensionType): The type of the extended Object. + """ + super(ExtensionInformation, self).__init__(Tags.EXTENSION_INFORMATION) + + if extension_name is None: + self.extension_name = ExtensionName() + else: + self.extension_name = extension_name + + self.extension_tag = extension_tag + self.extension_type = extension_type + + self.validate() + + def read(self, istream): + """ + Read the data encoding the ExtensionInformation object and decode it + into its constituent parts. + + Args: + istream (Stream): A data stream containing encoded object data, + supporting a read method; usually a BytearrayStream object. + """ + super(ExtensionInformation, self).read(istream) + tstream = BytearrayStream(istream.read(self.length)) + + self.extension_name.read(tstream) + + if self.is_tag_next(Tags.EXTENSION_TAG, tstream): + self.extension_tag = ExtensionTag() + self.extension_tag.read(tstream) + if self.is_tag_next(Tags.EXTENSION_TYPE, tstream): + self.extension_type = ExtensionType() + self.extension_type.read(tstream) + + self.is_oversized(tstream) + self.validate() + + def write(self, ostream): + """ + Write the data encoding the ExtensionInformation object to a stream. + + Args: + ostream (Stream): A data stream in which to encode object data, + supporting a write method; usually a BytearrayStream object. + """ + tstream = BytearrayStream() + + self.extension_name.write(tstream) + + if self.extension_tag is not None: + self.extension_tag.write(tstream) + if self.extension_type is not None: + self.extension_type.write(tstream) + + self.length = tstream.length() + super(ExtensionInformation, self).write(ostream) + ostream.write(tstream.buffer) + + def validate(self): + """ + Error check the attributes of the ExtensionInformation object. + """ + self.__validate() + + def __validate(self): + if not isinstance(self.extension_name, ExtensionName): + msg = "invalid extension name" + msg += "; expected {0}, received {1}".format( + ExtensionName, self.extension_name) + raise TypeError(msg) + + if self.extension_tag is not None: + if not isinstance(self.extension_tag, ExtensionTag): + msg = "invalid extension tag" + msg += "; expected {0}, received {1}".format( + ExtensionTag, self.extension_tag) + raise TypeError(msg) + + if self.extension_type is not None: + if not isinstance(self.extension_type, ExtensionType): + msg = "invalid extension type" + msg += "; expected {0}, received {1}".format( + ExtensionType, self.extension_type) + raise TypeError(msg) + + def __eq__(self, other): + if isinstance(other, ExtensionInformation): + if self.extension_name != other.extension_name: + return False + elif self.extension_tag != other.extension_tag: + return False + elif self.extension_type != other.extension_type: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, ExtensionInformation): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + name = "extension_name={0}".format(repr(self.extension_name)) + tag = "extension_tag={0}".format(repr(self.extension_tag)) + typ = "extension_type={0}".format(repr(self.extension_type)) + return "ExtensionInformation({0}, {1}, {2})".format(name, tag, typ) + + def __str__(self): + return repr(self) + + @classmethod + def create(cls, extension_name=None, extension_tag=None, + extension_type=None): + """ + Construct an ExtensionInformation object from provided extension + values. + + Args: + extension_name (str): The name of the extension. Optional, + defaults to None. + extension_tag (int): The tag number of the extension. Optional, + defaults to None. + extension_type (int): The type index of the extension. Optional, + defaults to None. + + Returns: + ExtensionInformation: The newly created set of extension + information. + + Example: + >>> x = ExtensionInformation.create('extension', 1, 1) + >>> x.extension_name.value + ExtensionName(value='extension') + >>> x.extension_tag.value + ExtensionTag(value=1) + >>> x.extension_type.value + ExtensionType(value=1) + """ + extension_name = ExtensionName(extension_name) + extension_tag = ExtensionTag(extension_tag) + extension_type = ExtensionType(extension_type) + + return ExtensionInformation( + extension_name=extension_name, + extension_tag=extension_tag, + extension_type=extension_type) diff --git a/kmip/core/primitives.py b/kmip/core/primitives.py index 08abfa4..d4da2a5 100644 --- a/kmip/core/primitives.py +++ b/kmip/core/primitives.py @@ -156,8 +156,9 @@ class Struct(Base): def __init__(self, tag=Tags.DEFAULT): super(Struct, self).__init__(tag, type=Types.STRUCTURE) + # NOTE (peter-hamilton) If seen, should indicate repr needs to be defined def __repr__(self): - return '' + return "Struct()" class Integer(Base): @@ -216,7 +217,10 @@ class Integer(Base): num_bytes) def __repr__(self): - return '' % (self.value) + return "{0}(value={1})".format(type(self).__name__, repr(self.value)) + + def __str__(self): + return "{0}".format(repr(self.value)) def __eq__(self, other): if isinstance(other, Integer): @@ -413,7 +417,11 @@ class Enumeration(Integer): Enum, type(self.enum))) def __repr__(self): - return '' % (self.enum.name, self.enum.value) + return "Enumeration(value={0})".format(self.enum) + + def __str__(self): + return "{0} - {1} - {2}".format( + type(self.enum), self.enum.name, self.enum.value) class Boolean(Base): @@ -550,7 +558,10 @@ class TextString(Base): data_type)) def __repr__(self): - return '' % (self.value) + return "{0}(value={1})".format(type(self).__name__, repr(self.value)) + + def __str__(self): + return "{0}".format(repr(self.value)) def __eq__(self, other): if isinstance(other, TextString): diff --git a/kmip/core/utils.py b/kmip/core/utils.py index adb4ca2..41b3882 100644 --- a/kmip/core/utils.py +++ b/kmip/core/utils.py @@ -66,7 +66,7 @@ def build_er_error(class_object, descriptor, expected, received, class BytearrayStream(io.RawIOBase): def __init__(self, data=None): if data is None: - self.buffer = b'' + self.buffer = bytes() else: self.buffer = bytes(data) @@ -81,10 +81,11 @@ class BytearrayStream(io.RawIOBase): return data def readall(self): - data = self.buffer[0:] - self.buffer = self.buffer[len(self.buffer):] + data = self.buffer + self.buffer = bytes() return data + # TODO (peter-hamilton) Unused, add documentation or cut. def readinto(self, b): if len(b) <= len(self.buffer): num_bytes_to_read = len(b) @@ -117,6 +118,17 @@ class BytearrayStream(io.RawIOBase): def __eq__(self, other): if isinstance(other, BytearrayStream): - return (self.buffer == other.buffer) + if len(self.buffer) != len(other.buffer): + return False + elif self.buffer != other.buffer: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, BytearrayStream): + return not (self == other) else: return NotImplemented diff --git a/kmip/demos/units/query.py b/kmip/demos/units/query.py new file mode 100644 index 0000000..8065385 --- /dev/null +++ b/kmip/demos/units/query.py @@ -0,0 +1,110 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +from six.moves import xrange + +from kmip.core.enums import Operation +from kmip.core.enums import QueryFunction as QueryFunctionEnum +from kmip.core.enums import ResultStatus + +from kmip.core.misc import QueryFunction + +from kmip.demos import utils + +from kmip.services.kmip_client import KMIPProxy + + +if __name__ == '__main__': + # Build and parse arguments + parser = utils.build_cli_parser(Operation.QUERY) + opts, args = parser.parse_args(sys.argv[1:]) + + username = opts.username + password = opts.password + + # Build and setup logging and needed factories + f_log = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, + 'logconfig.ini') + logging.config.fileConfig(f_log) + logger = logging.getLogger(__name__) + + # Build query function list. + query_functions = list() + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_OPERATIONS)) + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_OBJECTS)) + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_SERVER_INFORMATION)) + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_APPLICATION_NAMESPACES)) + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_EXTENSION_LIST)) + query_functions.append( + QueryFunction(QueryFunctionEnum.QUERY_EXTENSION_MAP)) + + # Build the client and connect to the server + client = KMIPProxy() + client.open() + + result = client.query(query_functions=query_functions) + client.close() + + # Display operation results + logger.info('query() result status: {0}'.format( + result.result_status.enum)) + + if result.result_status.enum == ResultStatus.SUCCESS: + operations = result.operations + object_types = result.object_types + vendor_identification = result.vendor_identification + server_information = result.server_information + application_namespaces = result.application_namespaces + extension_information = result.extension_information + + logger.info('number of operations supported: {0}'.format( + len(operations))) + for i in xrange(len(operations)): + logger.info('operation supported: {0}'.format(operations[i])) + + logger.info('number of object types supported: {0}'.format( + len(object_types))) + for i in xrange(len(object_types)): + logger.info('object type supported: {0}'.format(object_types[i])) + + logger.info('vendor identification: {0}'.format(vendor_identification)) + logger.info('server information: {0}'.format(server_information)) + + logger.info('number of application namespaces supported: {0}'.format( + len(application_namespaces))) + for i in xrange(len(application_namespaces)): + logger.info('application namespace supported: {0}'.format( + application_namespaces[i])) + + logger.info('number of extensions supported: {0}'.format( + len(extension_information))) + for i in xrange(len(extension_information)): + logger.info('extension supported: {0}'.format( + extension_information[i])) + + else: + logger.info('query() result reason: {0}'.format( + result.result_reason.enum)) + logger.info('query() result message: {0}'.format( + result.result_message.value)) diff --git a/kmip/demos/utils.py b/kmip/demos/utils.py index f678c4a..77f45cb 100644 --- a/kmip/demos/utils.py +++ b/kmip/demos/utils.py @@ -119,6 +119,8 @@ def build_cli_parser(operation): default=None, dest="length", help="Key length in bits (e.g., 128, 256)") + elif operation is Operation.QUERY: + pass elif operation is Operation.DISCOVER_VERSIONS: pass else: diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index 8e037b5..5ed6aa1 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -19,6 +19,7 @@ from kmip.services.results import DestroyResult from kmip.services.results import DiscoverVersionsResult from kmip.services.results import GetResult from kmip.services.results import LocateResult +from kmip.services.results import QueryResult from kmip.services.results import RegisterResult from kmip.services.results import RekeyKeyPairResult @@ -45,6 +46,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -176,6 +178,31 @@ class KMIPProxy(KMIP): object_group_member=object_group_member, attributes=attributes, credential=credential) + def query(self, batch=False, query_functions=None, credential=None): + """ + Send a Query request to the server. + + Args: + batch (boolean): A flag indicating if the operation should be sent + with a batch of additional operations. Defaults to False. + query_functions (list): A list of QueryFunction enumerations + indicating what information the client wants from the server. + Optional, defaults to None. + credential (Credential): A Credential object containing + authentication information for the server. Optional, defaults + to None. + """ + batch_item = self._build_query_batch_item(query_functions) + + # TODO (peter-hamilton): Replace this with official client batch mode. + if batch: + self.batch_items.append(batch_item) + else: + request = self._build_request_message(credential, [batch_item]) + response = self._send_and_receive_message(request) + results = self._process_batch_items(response) + return results[0] + def discover_versions(self, batch=False, protocol_versions=None, credential=None): batch_item = self._build_discover_versions_batch_item( @@ -257,6 +284,13 @@ class KMIPProxy(KMIP): operation=operation, request_payload=payload) return batch_item + def _build_query_batch_item(self, query_functions=None): + operation = Operation(OperationEnum.QUERY) + payload = query.QueryRequestPayload(query_functions) + batch_item = messages.RequestBatchItem( + operation=operation, request_payload=payload) + return batch_item + def _build_discover_versions_batch_item(self, protocol_versions=None): operation = Operation(OperationEnum.DISCOVER_VERSIONS) @@ -281,6 +315,8 @@ class KMIPProxy(KMIP): return self._process_create_key_pair_batch_item elif operation == OperationEnum.REKEY_KEY_PAIR: return self._process_rekey_key_pair_batch_item + elif operation == OperationEnum.QUERY: + return self._process_query_batch_item elif operation == OperationEnum.DISCOVER_VERSIONS: return self._process_discover_versions_batch_item else: @@ -317,6 +353,35 @@ class KMIPProxy(KMIP): return self._process_key_pair_batch_item( batch_item, RekeyKeyPairResult) + def _process_query_batch_item(self, batch_item): + payload = batch_item.response_payload + + operations = None + object_types = None + vendor_identification = None + server_information = None + application_namespaces = None + extension_information = None + + if payload is not None: + operations = payload.operations + object_types = payload.object_types + vendor_identification = payload.vendor_identification + server_information = payload.server_information + application_namespaces = payload.application_namespaces + extension_information = payload.extension_information + + return QueryResult( + batch_item.result_status, + batch_item.result_reason, + batch_item.result_message, + operations, + object_types, + vendor_identification, + server_information, + application_namespaces, + extension_information) + def _process_discover_versions_batch_item(self, batch_item): payload = batch_item.response_payload diff --git a/kmip/services/results.py b/kmip/services/results.py index bdf754b..d68495e 100644 --- a/kmip/services/results.py +++ b/kmip/services/results.py @@ -174,6 +174,60 @@ class LocateResult(OperationResult): self.uuids = uuids +class QueryResult(OperationResult): + """ + A container for the results of a Query operation. + + Attributes: + result_status: The status of the Query operation (e.g., success or + failure). + result_reason: The reason for the operation status. + result_message: Extra information pertaining to the status reason. + operations: A list of Operations supported by the server. + object_types: A list of Object Types supported by the server. + vendor_identification: + server_information: + application_namespaces: A list of namespaces supported by the server. + extension_information: A list of extensions supported by the server. + """ + + def __init__(self, + result_status, + result_reason=None, + result_message=None, + operations=None, + object_types=None, + vendor_identification=None, + server_information=None, + application_namespaces=None, + extension_information=None): + super(QueryResult, self).__init__( + result_status, result_reason, result_message) + + if operations is None: + self.operations = list() + else: + self.operations = operations + + if object_types is None: + self.object_types = list() + else: + self.object_types = object_types + + self.vendor_identification = vendor_identification + self.server_information = server_information + + if application_namespaces is None: + self.application_namespaces = list() + else: + self.application_namespaces = application_namespaces + + if extension_information is None: + self.extension_information = list() + else: + self.extension_information = extension_information + + class DiscoverVersionsResult(OperationResult): def __init__(self, diff --git a/kmip/tests/core/factories/payloads/test_request.py b/kmip/tests/core/factories/payloads/test_request.py index 9f2f2b7..7097d53 100644 --- a/kmip/tests/core/factories/payloads/test_request.py +++ b/kmip/tests/core/factories/payloads/test_request.py @@ -24,6 +24,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -138,8 +139,8 @@ class TestRequestPayloadFactory(testtools.TestCase): self.factory.create, Operation.VALIDATE) def test_create_query_payload(self): - self._test_not_implemented( - self.factory.create, Operation.QUERY) + payload = self.factory.create(Operation.QUERY) + self._test_payload_type(payload, query.QueryRequestPayload) def test_create_cancel_payload(self): self._test_not_implemented( diff --git a/kmip/tests/core/factories/payloads/test_response.py b/kmip/tests/core/factories/payloads/test_response.py index b47c31c..597accb 100644 --- a/kmip/tests/core/factories/payloads/test_response.py +++ b/kmip/tests/core/factories/payloads/test_response.py @@ -24,6 +24,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -138,8 +139,8 @@ class TestResponsePayloadFactory(testtools.TestCase): self.factory.create, Operation.VALIDATE) def test_create_query_payload(self): - self._test_not_implemented( - self.factory.create, Operation.QUERY) + payload = self.factory.create(Operation.QUERY) + self._test_payload_type(payload, query.QueryResponsePayload) def test_create_cancel_payload(self): self._test_not_implemented( diff --git a/kmip/tests/core/messages/payloads/test_query.py b/kmip/tests/core/messages/payloads/test_query.py new file mode 100644 index 0000000..6b39e39 --- /dev/null +++ b/kmip/tests/core/messages/payloads/test_query.py @@ -0,0 +1,587 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from six.moves import xrange + +from testtools import TestCase + +from kmip.core import utils + +from kmip.core.attributes import ObjectType + +from kmip.core.enums import ObjectType as ObjectTypeEnum +from kmip.core.enums import Operation as OperationEnum +from kmip.core.enums import QueryFunction as QueryFunctionEnum + +from kmip.core.messages.contents import Operation +from kmip.core.messages.payloads import query + +from kmip.core.misc import QueryFunction +from kmip.core.misc import VendorIdentification +from kmip.core.misc import ServerInformation + +from kmip.core.objects import ExtensionInformation +from kmip.core.objects import ExtensionName + + +class TestQueryRequestPayload(TestCase): + """ + Test suite for the QueryRequestPayload class. + + Test encodings obtained from Sections 12.1 and 12.2 of the KMIP 1.1 Test + Cases documentation. + """ + + def setUp(self): + super(TestQueryRequestPayload, self).setUp() + + self.query_functions_a = list() + self.query_functions_b = list() + self.query_functions_c = list() + + self.query_functions_b.append(QueryFunction( + QueryFunctionEnum.QUERY_OPERATIONS)) + self.query_functions_b.append(QueryFunction( + QueryFunctionEnum.QUERY_OBJECTS)) + self.query_functions_b.append(QueryFunction( + QueryFunctionEnum.QUERY_SERVER_INFORMATION)) + + self.query_functions_c.append(QueryFunction( + QueryFunctionEnum.QUERY_EXTENSION_LIST)) + + self.encoding_a = utils.BytearrayStream(( + b'\x42\x00\x79\x01\x00\x00\x00\x00')) + + self.encoding_b = utils.BytearrayStream(( + b'\x42\x00\x79\x01\x00\x00\x00\x30\x42\x00\x74\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x74\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x74\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x03\x00\x00\x00\x00')) + + self.encoding_c = utils.BytearrayStream(( + b'\x42\x00\x79\x01\x00\x00\x00\x10\x42\x00\x74\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x05\x00\x00\x00\x00')) + + def tearDown(self): + super(TestQueryRequestPayload, self).tearDown() + + def test_init_with_none(self): + """ + Test that a QueryRequestPayload object can be constructed with no + specified value. + """ + query.QueryRequestPayload() + + def test_init_with_args(self): + """ + Test that a QueryRequestPayload object can be constructed with valid + values. + """ + query.QueryRequestPayload(self.query_functions_a) + query.QueryRequestPayload(self.query_functions_b) + query.QueryRequestPayload(self.query_functions_c) + + def test_validate_with_invalid_query_functions_list(self): + """ + Test that a TypeError exception is raised when an invalid QueryFunction + list is used to construct a QueryRequestPayload object. + """ + kwargs = {'query_functions': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid query functions list", + query.QueryRequestPayload, **kwargs) + + def test_validate_with_invalid_query_functions_item(self): + """ + Test that a TypeError exception is raised when an invalid QueryFunction + item is used to construct a QueryRequestPayload object. + """ + kwargs = {'query_functions': ['invalid']} + self.assertRaisesRegexp( + TypeError, "invalid query function", + query.QueryRequestPayload, **kwargs) + + def _test_read(self, stream, query_functions): + payload = query.QueryRequestPayload() + payload.read(stream) + expected = len(query_functions) + observed = len(payload.query_functions) + + msg = "query functions list decoding mismatch" + msg += "; expected {0} results, received {1}".format( + expected, observed) + self.assertEqual(expected, observed, msg) + + for i in xrange(len(query_functions)): + expected = query_functions[i] + observed = payload.query_functions[i] + + msg = "query function decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_read_with_empty_query_functions_list(self): + """ + Test that a QueryRequestPayload object with no data can be read from + a data stream. + """ + self._test_read(self.encoding_a, self.query_functions_a) + + def test_read_with_multiple_query_functions(self): + """ + Test that a QueryRequestPayload object with multiple pieces of data + can be read from a data stream. + """ + self._test_read(self.encoding_b, self.query_functions_b) + + def test_read_with_one_query_function(self): + """ + Test that a QueryRequestPayload object with a single piece of data can + be read from a data stream. + """ + self._test_read(self.encoding_c, self.query_functions_c) + + def _test_write(self, encoding, query_functions): + stream = utils.BytearrayStream() + payload = query.QueryRequestPayload(query_functions) + payload.write(stream) + + length_expected = len(encoding) + length_received = len(stream) + + msg = "encoding lengths not equal" + msg += "; expected {0}, received {1}".format( + length_expected, length_received) + self.assertEqual(length_expected, length_received, msg) + + msg = "encoding mismatch" + msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(encoding, stream) + + self.assertEqual(encoding, stream, msg) + + def test_write_with_empty_query_functions_list(self): + """ + Test that a QueryRequestPayload object with no data can be written to + a data stream. + """ + self._test_write(self.encoding_a, self.query_functions_a) + + def test_write_with_multiple_query_functions(self): + """ + Test that a QueryRequestPayload object with multiple pieces of data + can be written to a data stream. + """ + self._test_write(self.encoding_b, self.query_functions_b) + + def test_write_with_one_query_function(self): + """ + Test that a QueryRequestPayload object with a single piece of data can + be written to a data stream. + """ + self._test_write(self.encoding_c, self.query_functions_c) + + +class TestQueryResponsePayload(TestCase): + """ + Test encodings obtained from Sections 12.1 and 12.2 of the KMIP 1.1 Test + Cases documentation. + """ + + def setUp(self): + super(TestQueryResponsePayload, self).setUp() + + self.operations = list() + self.object_types = list() + self.application_namespaces = list() + self.extension_information = list() + + self.vendor_identification = VendorIdentification( + "IBM test server, not-TKLM 2.0.1.1 KMIP 2.0.0.1") + self.server_information = ServerInformation() + + self.operations.append(Operation(OperationEnum.CREATE)) + self.operations.append(Operation(OperationEnum.CREATE_KEY_PAIR)) + self.operations.append(Operation(OperationEnum.REGISTER)) + self.operations.append(Operation(OperationEnum.REKEY)) + self.operations.append(Operation(OperationEnum.CERTIFY)) + self.operations.append(Operation(OperationEnum.RECERTIFY)) + self.operations.append(Operation(OperationEnum.LOCATE)) + self.operations.append(Operation(OperationEnum.CHECK)) + self.operations.append(Operation(OperationEnum.GET)) + self.operations.append(Operation(OperationEnum.GET_ATTRIBUTES)) + self.operations.append(Operation(OperationEnum.GET_ATTRIBUTE_LIST)) + self.operations.append(Operation(OperationEnum.ADD_ATTRIBUTE)) + self.operations.append(Operation(OperationEnum.MODIFY_ATTRIBUTE)) + self.operations.append(Operation(OperationEnum.DELETE_ATTRIBUTE)) + self.operations.append(Operation(OperationEnum.OBTAIN_LEASE)) + self.operations.append(Operation(OperationEnum.GET_USAGE_ALLOCATION)) + self.operations.append(Operation(OperationEnum.ACTIVATE)) + self.operations.append(Operation(OperationEnum.REVOKE)) + self.operations.append(Operation(OperationEnum.DESTROY)) + self.operations.append(Operation(OperationEnum.ARCHIVE)) + self.operations.append(Operation(OperationEnum.RECOVER)) + self.operations.append(Operation(OperationEnum.QUERY)) + self.operations.append(Operation(OperationEnum.CANCEL)) + self.operations.append(Operation(OperationEnum.POLL)) + self.operations.append(Operation(OperationEnum.REKEY_KEY_PAIR)) + self.operations.append(Operation(OperationEnum.DISCOVER_VERSIONS)) + + self.object_types.append(ObjectType(ObjectTypeEnum.CERTIFICATE)) + self.object_types.append(ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)) + self.object_types.append(ObjectType(ObjectTypeEnum.PUBLIC_KEY)) + self.object_types.append(ObjectType(ObjectTypeEnum.PRIVATE_KEY)) + self.object_types.append(ObjectType(ObjectTypeEnum.TEMPLATE)) + self.object_types.append(ObjectType(ObjectTypeEnum.SECRET_DATA)) + + self.extension_information.append(ExtensionInformation( + extension_name=ExtensionName("ACME LOCATION"))) + self.extension_information.append(ExtensionInformation( + extension_name=ExtensionName("ACME ZIP CODE"))) + + self.encoding_a = utils.BytearrayStream(( + b'\x42\x00\x7C\x01\x00\x00\x00\x00')) + + self.encoding_b = utils.BytearrayStream(( + b'\x42\x00\x7C\x01\x00\x00\x02\x40\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x03\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x04\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x06\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x07\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x08\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x09\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0A\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0B\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0C\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0D\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0E\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x0F\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x10\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x11\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x12\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x13\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x14\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x15\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x16\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x18\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x19\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x1A\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x1D\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x1E\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x03\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x04\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x06\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04' + b'\x00\x00\x00\x07\x00\x00\x00\x00\x42\x00\x9D\x07\x00\x00\x00\x2E' + b'\x49\x42\x4D\x20\x74\x65\x73\x74\x20\x73\x65\x72\x76\x65\x72\x2C' + b'\x20\x6E\x6F\x74\x2D\x54\x4B\x4C\x4D\x20\x32\x2E\x30\x2E\x31\x2E' + b'\x31\x20\x4B\x4D\x49\x50\x20\x32\x2E\x30\x2E\x30\x2E\x31\x00\x00' + b'\x42\x00\x88\x01\x00\x00\x00\x00')) + + self.encoding_c = utils.BytearrayStream(( + b'\x42\x00\x7C\x01\x00\x00\x00\x40\x42\x00\xA4\x01\x00\x00\x00\x18' + b'\x42\x00\xA5\x07\x00\x00\x00\x0D\x41\x43\x4D\x45\x20\x4C\x4F\x43' + b'\x41\x54\x49\x4F\x4E\x00\x00\x00\x42\x00\xA4\x01\x00\x00\x00\x18' + b'\x42\x00\xA5\x07\x00\x00\x00\x0D\x41\x43\x4D\x45\x20\x5A\x49\x50' + b'\x20\x43\x4F\x44\x45\x00\x00\x00')) + + def tearDown(self): + super(TestQueryResponsePayload, self).tearDown() + + def test_init_with_none(self): + """ + Test that a QueryResponsePayload object can be constructed with no + specified value. + """ + query.QueryResponsePayload() + + def test_init_with_args(self): + """ + Test that a QueryResponsePayload object can be constructed with valid + values. + """ + query.QueryResponsePayload( + operations=self.operations, + object_types=self.object_types, + vendor_identification=self.vendor_identification, + server_information=self.server_information, + application_namespaces=self.application_namespaces, + extension_information=self.extension_information) + + def test_validate_with_invalid_operations_list(self): + """ + Test that a TypeError exception is raised when an invalid Operations + list is used to construct a QueryResponsePayload object. + """ + kwargs = {'operations': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid operations list", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_operations_item(self): + """ + Test that a TypeError exception is raised when an invalid Operations + item is used to construct a QueryResponsePayload object. + """ + kwargs = {'operations': ['invalid']} + self.assertRaisesRegexp( + TypeError, "invalid operation", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_object_types_list(self): + """ + Test that a TypeError exception is raised when an invalid ObjectTypes + list is used to construct a QueryResponsePayload object. + """ + kwargs = {'object_types': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid object types list", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_object_types_item(self): + """ + Test that a TypeError exception is raised when an invalid ObjectTypes + item is used to construct a QueryResponsePayload object. + """ + kwargs = {'object_types': ['invalid']} + self.assertRaisesRegexp( + TypeError, "invalid object type", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_vendor_identification(self): + """ + Test that a TypeError exception is raised when an invalid + VendorIdentification item is used to construct a QueryResponsePayload + object. + """ + kwargs = {'vendor_identification': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid vendor identification", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_server_information(self): + """ + Test that a TypeError exception is raised when an invalid + ServerInformation item is used to construct a QueryResponsePayload + object. + """ + kwargs = {'server_information': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid server information", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_application_namespaces_list(self): + """ + Test that a TypeError exception is raised when an invalid + ApplicationNamespaces list is used to construct a QueryResponsePayload + object. + """ + kwargs = {'application_namespaces': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid application namespaces list", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_application_namespaces_item(self): + """ + Test that a TypeError exception is raised when an invalid + ApplicationNamespaces item is used to construct a QueryResponsePayload + object. + """ + kwargs = {'application_namespaces': ['invalid']} + self.assertRaisesRegexp( + TypeError, "invalid application namespace", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_extension_information_list(self): + """ + Test that a TypeError exception is raised when an invalid + ExtensionInformation list is used to construct a QueryResponsePayload + object. + """ + kwargs = {'extension_information': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid extension information list", + query.QueryResponsePayload, **kwargs) + + def test_validate_with_invalid_extension_information_item(self): + """ + Test that a TypeError exception is raised when an invalid + ExtensionInformation item is used to construct a QueryResponsePayload + object. + """ + kwargs = {'extension_information': ['invalid']} + self.assertRaisesRegexp( + TypeError, "invalid extension information", + query.QueryResponsePayload, **kwargs) + + def _test_read(self, stream, operations, object_types, + vendor_identification, server_information, + application_namespaces, extension_information): + payload = query.QueryResponsePayload() + payload.read(stream) + + # Test decoding of all operations. + expected = len(operations) + observed = len(payload.operations) + + msg = "operations list decoding mismatch" + msg += "; expected {0} results, received {1}".format( + expected, observed) + self.assertEqual(expected, observed, msg) + + for i in xrange(len(operations)): + expected = operations[i] + observed = payload.operations[i] + + msg = "operation decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + # Test decoding of all object types. + expected = len(object_types) + observed = len(payload.object_types) + + msg = "object types list decoding mismatch" + msg += "; expected {0} results, received {1}".format( + expected, observed) + self.assertEqual(expected, observed, msg) + + for i in xrange(len(object_types)): + expected = object_types[i] + observed = payload.object_types[i] + + msg = "object type decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + # Test decoding of vendor identification. + expected = vendor_identification + observed = payload.vendor_identification + + msg = "vendor identification decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + # Test decoding of server information. + expected = server_information + observed = payload.server_information + + msg = "server information decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + # Test decoding of all application namespaces. + expected = len(application_namespaces) + observed = len(payload.application_namespaces) + + msg = "application namespaces list decoding mismatch" + msg += "; expected {0} results, received {1}".format( + expected, observed) + self.assertEqual(expected, observed, msg) + + # Test decoding of all extension information. + expected = len(extension_information) + observed = len(payload.extension_information) + + msg = "extension information list decoding mismatch" + msg += "; expected {0} results, received {1}".format( + expected, observed) + self.assertEqual(expected, observed, msg) + + for i in xrange(len(extension_information)): + expected = extension_information[i] + observed = payload.extension_information[i] + + msg = "extension information decoding mismatch" + msg += "; expected {0}, received {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_read_with_no_data(self): + """ + Test that a QueryResponsePayload object with no data can be read from + a data stream. + """ + self._test_read( + self.encoding_a, list(), list(), None, None, list(), list()) + + def test_read_with_operations_object_types_and_server_info(self): + """ + Test that a QueryResponsePayload object with multiple pieces of data + can be read from a data stream. + """ + self._test_read( + self.encoding_b, self.operations, self.object_types, + self.vendor_identification, self.server_information, + self.application_namespaces, list()) + + def test_read_with_extension_information(self): + """ + Test that a QueryResponsePayload object with one piece of data can be + read from a data stream. + """ + self._test_read( + self.encoding_c, list(), list(), None, None, + self.application_namespaces, self.extension_information) + + def _test_write(self, encoding, operations, object_types, + vendor_identification, server_information, + application_namespaces, extension_information): + stream = utils.BytearrayStream() + payload = query.QueryResponsePayload( + operations, object_types, vendor_identification, + server_information, application_namespaces, extension_information) + payload.write(stream) + + length_expected = len(encoding) + length_received = len(stream) + + msg = "encoding lengths not equal" + msg += "; expected {0}, received {1}".format( + length_expected, length_received) + self.assertEqual(length_expected, length_received, msg) + + msg = "encoding mismatch" + msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(encoding, stream) + + self.assertEqual(encoding, stream, msg) + + def test_write_with_no_data(self): + """ + Test that a QueryResponsePayload object with no data can be written to + a data stream. + """ + self._test_write( + self.encoding_a, list(), list(), None, None, list(), list()) + + def test_write_with_operations_object_types_and_server_info(self): + """ + Test that a QueryResponsePayload object with multiple pieces of data + can be written to a data stream. + """ + self._test_write( + self.encoding_b, self.operations, self.object_types, + self.vendor_identification, self.server_information, + self.application_namespaces, list()) + + def test_write_with_extension_information(self): + """ + Test that a QueryResponsePayload object with one piece of data can be + written to a data stream. + """ + self._test_write( + self.encoding_c, list(), list(), None, None, + self.application_namespaces, self.extension_information) diff --git a/kmip/tests/core/misc/__init__.py b/kmip/tests/core/misc/__init__.py new file mode 100644 index 0000000..417e2f9 --- /dev/null +++ b/kmip/tests/core/misc/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/kmip/tests/core/misc/test_misc.py b/kmip/tests/core/misc/test_misc.py new file mode 100644 index 0000000..443dacb --- /dev/null +++ b/kmip/tests/core/misc/test_misc.py @@ -0,0 +1,117 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from six import string_types +from testtools import TestCase + +from kmip.core.enums import QueryFunction as QueryFunctionEnum + +from kmip.core.misc import QueryFunction +from kmip.core.misc import VendorIdentification + + +class TestQueryFunction(TestCase): + """ + A test suite for the QueryFunction class. + + Since QueryFunction is a simple wrapper for the Enumeration primitive, + only a few tests pertaining to construction are needed. + """ + + def setUp(self): + super(TestQueryFunction, self).setUp() + + def tearDown(self): + super(TestQueryFunction, self).tearDown() + + def _test_init(self, value): + if (isinstance(value, QueryFunctionEnum)) or (value is None): + query_function = QueryFunction(value) + + msg = "expected {0}, observed {1}".format( + value, query_function.enum) + self.assertEqual(value, query_function.enum, msg) + else: + self.assertRaises(TypeError, QueryFunction, value) + + def test_init_with_none(self): + """ + Test that a QueryFunction object can be constructed with no specified + value. + """ + self._test_init(None) + + def test_init_with_valid(self): + """ + Test that a QueryFunction object can be constructed with a valid + QueryFunction enumeration value. + """ + self._test_init(QueryFunctionEnum.QUERY_OBJECTS) + + def test_init_with_invalid(self): + """ + Test that a TypeError exception is raised when a non QueryFunction + enumeration value is used to construct a QueryFunction object. + """ + self._test_init("invalid") + + +class TestVendorIdentification(TestCase): + """ + A test suite for the VendorIdentification class. + + Since VendorIdentification is a simple wrapper for the TextString + primitive, only a few tests pertaining to construction are needed. + """ + + def setUp(self): + super(TestVendorIdentification, self).setUp() + + def tearDown(self): + super(TestVendorIdentification, self).tearDown() + + def _test_init(self, value): + if (isinstance(value, string_types)) or (value is None): + vendor_identification = VendorIdentification(value) + + if value is None: + value = '' + + msg = "expected {0}, observed {1}".format( + value, vendor_identification.value) + self.assertEqual(value, vendor_identification.value, msg) + else: + self.assertRaises(TypeError, VendorIdentification, value) + + def test_init_with_none(self): + """ + Test that a VendorIdentification object can be constructed with no + specified value. + """ + self._test_init(None) + + def test_init_with_valid(self): + """ + Test that a VendorIdentification object can be constructed with a + valid, string-type value. + """ + self._test_init("valid") + + def test_init_with_invalid(self): + """ + Test that a TypeError exception is raised when a non-string value is + used to construct a VendorIdentification object. + """ + self._test_init(0) diff --git a/kmip/tests/core/misc/test_server_information.py b/kmip/tests/core/misc/test_server_information.py new file mode 100644 index 0000000..863cc2b --- /dev/null +++ b/kmip/tests/core/misc/test_server_information.py @@ -0,0 +1,252 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from six import string_types +from testtools import TestCase + +from kmip.core.misc import ServerInformation +from kmip.core.utils import BytearrayStream + + +class TestServerInformation(TestCase): + """ + A test suite for the ServerInformation class. + """ + + def setUp(self): + super(TestServerInformation, self).setUp() + + self.data = BytearrayStream(b'\x00\x01\x02\x03') + + self.encoding_a = BytearrayStream( + b'\x42\x00\x88\x01\x00\x00\x00\x00') + self.encoding_b = BytearrayStream( + b'\x42\x00\x88\x01\x00\x00\x00\x04\x00\x01\x02\x03') + + def tearDown(self): + super(TestServerInformation, self).tearDown() + + def test_init(self): + ServerInformation() + + def _test_read(self, stream, data): + server_information = ServerInformation() + server_information.read(stream) + + expected = data + observed = server_information.data + + msg = "data decoding mismatch" + msg += "; expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_read_with_none(self): + """ + Test that a ServerInformation object with no data can be read from a + data stream. + """ + self._test_read(self.encoding_a, BytearrayStream()) + + def test_read_with_data(self): + """ + Test that a ServerInformation object with data can be read from a + data stream. + """ + self._test_read(self.encoding_b, self.data) + + def _test_write(self, stream_expected, data): + stream_observed = BytearrayStream() + server_information = ServerInformation() + + if data is not None: + server_information.data = data + + server_information.write(stream_observed) + + length_expected = len(stream_expected) + length_observed = len(stream_observed) + + msg = "encoding lengths not equal" + msg += "; expected {0}, observed {1}".format( + length_expected, length_observed) + self.assertEqual(length_expected, length_observed, msg) + + msg = "encoding mismatch" + msg += ";\nexpected:\n{0}\nobserved:\n{1}".format( + stream_expected, stream_observed) + self.assertEqual(stream_expected, stream_observed, msg) + + def test_write_with_none(self): + """ + Test that a ServerInformation object with no data can be written to a + data stream. + """ + self._test_write(self.encoding_a, None) + + def test_write_with_data(self): + """ + Test that a ServerInformation object with data can be written to a + data stream. + """ + self._test_write(self.encoding_b, self.data) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + ServerInformation objects with the same internal data. + """ + a = ServerInformation() + b = ServerInformation() + + a.data = self.data + b.data = self.data + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_equal_and_empty(self): + """ + Test that the equality operator returns True when comparing two + ServerInformation objects with no internal data. + """ + a = ServerInformation() + b = ServerInformation() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal(self): + """ + Test that the equality operator returns False when comparing two + ServerInformation objects with different sets of internal data. + """ + a = ServerInformation() + b = ServerInformation() + + a.data = self.data + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing a + ServerInformation object to a non-ServerInformation object. + """ + a = ServerInformation() + b = "invalid" + + self.assertFalse(a == b) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing + two ServerInformation objects with the same internal data. + """ + a = ServerInformation() + b = ServerInformation() + + a.data = self.data + b.data = self.data + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_equal_and_empty(self): + """ + Test that the inequality operator returns False when comparing + two ServerInformation objects with no internal data. + """ + a = ServerInformation() + b = ServerInformation() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal(self): + """ + Test that the inequality operator returns True when comparing two + ServerInformation objects with different sets of internal data. + """ + a = ServerInformation() + b = ServerInformation() + + a.data = self.data + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing a + ServerInformation object to a non-ServerInformation object. + """ + a = ServerInformation() + b = "invalid" + + self.assertTrue(a != b) + + def test_repr(self): + """ + Test that the representation of a ServerInformation object is + formatted properly and can be used by eval to create a new + ServerInformation object. + """ + server_information = ServerInformation() + + expected = "ServerInformation()" + observed = repr(server_information) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + expected = server_information + observed = eval(observed) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def _test_str(self, data): + server_information = ServerInformation() + server_information.data = data + str_repr = str(server_information) + + expected = len(str(data)) + observed = len(str_repr) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + # TODO (peter-hamilton) This should be binary_type. Fix involves + # TODO (peter-hamilton) refining BytearrayStream implementation. + expected = string_types + observed = str_repr + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertIsInstance(observed, expected, msg) + + def test_str_with_no_data(self): + """ + Test that the string representation of a ServerInformation object + is formatted properly when there is no internal data. + """ + self._test_str(BytearrayStream()) + + def test_str_with_data(self): + """ + Test that the string representation of a ServerInformation object + is formatted properly when there is internal data. + """ + self._test_str(self.data) diff --git a/kmip/tests/core/objects/__init__.py b/kmip/tests/core/objects/__init__.py new file mode 100644 index 0000000..417e2f9 --- /dev/null +++ b/kmip/tests/core/objects/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/kmip/tests/core/objects/test_extension_information.py b/kmip/tests/core/objects/test_extension_information.py new file mode 100644 index 0000000..e3fa7d9 --- /dev/null +++ b/kmip/tests/core/objects/test_extension_information.py @@ -0,0 +1,444 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from testtools import TestCase + +from kmip.core.objects import ExtensionInformation +from kmip.core.objects import ExtensionName +from kmip.core.objects import ExtensionTag +from kmip.core.objects import ExtensionType + +from kmip.core.utils import BytearrayStream + + +class TestExtensionInformation(TestCase): + """ + A test suite for the ExtensionInformation class. + + Test encodings obtained from Section 12.2 of the KMIP 1.1 Test Cases + documentation. + """ + + def setUp(self): + super(TestExtensionInformation, self).setUp() + + self.extension_name_b = ExtensionName('ACME LOCATION') + self.extension_name_c = ExtensionName('ACME LOCATION') + self.extension_name_d = ExtensionName('ACME ZIP CODE') + + self.extension_tag_c = ExtensionTag(5548545) + self.extension_tag_d = ExtensionTag(5548546) + + self.extension_type_c = ExtensionType(7) + self.extension_type_d = ExtensionType(2) + + self.encoding_a = BytearrayStream( + b'\x42\x00\xA4\x01\x00\x00\x00\x08\x42\x00\xA5\x07\x00\x00\x00' + b'\x00') + self.encoding_b = BytearrayStream( + b'\x42\x00\xA4\x01\x00\x00\x00\x18\x42\x00\xA5\x07\x00\x00\x00\x0D' + b'\x41\x43\x4D\x45\x20\x4C\x4F\x43\x41\x54\x49\x4F\x4E\x00\x00' + b'\x00') + self.encoding_c = BytearrayStream( + b'\x42\x00\xA4\x01\x00\x00\x00\x38\x42\x00\xA5\x07\x00\x00\x00\x0D' + b'\x41\x43\x4D\x45\x20\x4C\x4F\x43\x41\x54\x49\x4F\x4E\x00\x00\x00' + b'\x42\x00\xA6\x02\x00\x00\x00\x04\x00\x54\xAA\x01\x00\x00\x00\x00' + b'\x42\x00\xA7\x02\x00\x00\x00\x04\x00\x00\x00\x07\x00\x00\x00' + b'\x00') + self.encoding_d = BytearrayStream( + b'\x42\x00\xA4\x01\x00\x00\x00\x38\x42\x00\xA5\x07\x00\x00\x00\x0D' + b'\x41\x43\x4D\x45\x20\x5A\x49\x50\x20\x43\x4F\x44\x45\x00\x00\x00' + b'\x42\x00\xA6\x02\x00\x00\x00\x04\x00\x54\xAA\x02\x00\x00\x00\x00' + b'\x42\x00\xA7\x02\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00' + b'\x00') + + def tearDown(self): + super(TestExtensionInformation, self).tearDown() + + def _test_init(self): + pass + + def test_init_with_none(self): + ExtensionInformation() + + def test_init_with_args(self): + ExtensionInformation( + extension_name=ExtensionName(), + extension_tag=ExtensionTag(), + extension_type=ExtensionType()) + + def test_validate_with_invalid_extension_name(self): + """ + Test that a TypeError exception is raised when an invalid + ExtensionName is used to construct an ExtensionInformation object. + """ + kwargs = {'extension_name': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid extension name", + ExtensionInformation, **kwargs) + + def test_validate_with_invalid_extension_tag(self): + """ + Test that a TypeError exception is raised when an invalid + ExtensionTag is used to construct an ExtensionInformation object. + """ + kwargs = {'extension_tag': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid extension tag", + ExtensionInformation, **kwargs) + + def test_validate_with_invalid_extension_type(self): + """ + Test that a TypeError exception is raised when an invalid + ExtensionType is used to construct an ExtensionInformation object. + """ + kwargs = {'extension_type': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid extension type", + ExtensionInformation, **kwargs) + + def _test_read(self, stream, extension_name, extension_tag, + extension_type): + extension_information = ExtensionInformation() + extension_information.read(stream) + + if extension_name is None: + extension_name = ExtensionName() + + msg = "extension name encoding mismatch" + msg += "; expected {0}, observed {1}".format( + extension_name, + extension_information.extension_name) + self.assertEqual( + extension_name, + extension_information.extension_name, msg) + + msg = "extension tag encoding mismatch" + msg += "; expected {0}, observed {1}".format( + extension_tag, + extension_information.extension_tag) + self.assertEqual( + extension_tag, + extension_information.extension_tag, msg) + + msg = "extension type encoding mismatch" + msg += "; expected {0}, observed {1}".format( + extension_type, + extension_information.extension_type) + self.assertEqual( + extension_type, + extension_information.extension_type, msg) + + def test_read_with_none(self): + """ + Test that an ExtensionInformation object with no data can be read from + a data stream. + """ + self._test_read(self.encoding_a, None, None, None) + + def test_read_with_partial_args(self): + """ + Test that an ExtensionInformation object with some data can be read + from a data stream. + """ + self._test_read(self.encoding_b, self.extension_name_b, None, None) + + def test_read_with_multiple_args(self): + """ + Test that an ExtensionInformation object with data can be read from a + data stream. + """ + self._test_read(self.encoding_c, self.extension_name_c, + self.extension_tag_c, self.extension_type_c) + + def _test_write(self, stream_expected, extension_name, extension_tag, + extension_type): + stream_observed = BytearrayStream() + extension_information = ExtensionInformation( + extension_name=extension_name, + extension_tag=extension_tag, + extension_type=extension_type) + extension_information.write(stream_observed) + + length_expected = len(stream_expected) + length_observed = len(stream_observed) + + msg = "encoding lengths not equal" + msg += "; expected {0}, observed {1}".format( + length_expected, length_observed) + self.assertEqual(length_expected, length_observed, msg) + + msg = "encoding mismatch" + msg += ";\nexpected:\n{0}\nobserved:\n{1}".format( + stream_expected, stream_observed) + self.assertEqual(stream_expected, stream_observed, msg) + + def test_write_with_none(self): + """ + Test that an ExtensionInformation object with no data can be written + to a data stream. + """ + self._test_write(self.encoding_a, None, None, None) + + def test_write_with_partial_args(self): + """ + Test that an ExtensionInformation object with some data can be written + to a data stream. + """ + self._test_write(self.encoding_b, self.extension_name_b, None, None) + + def test_write_with_multiple_args(self): + """ + Test that an ExtensionInformation object with data can be written to + a data stream. + """ + self._test_write(self.encoding_c, self.extension_name_c, + self.extension_tag_c, self.extension_type_c) + + def _test_create(self, extension_name, extension_tag, extension_type): + extension_information = ExtensionInformation.create( + extension_name=extension_name, + extension_tag=extension_tag, + extension_type=extension_type) + + self.assertIsInstance(extension_information, ExtensionInformation) + + expected = ExtensionName(extension_name) + observed = extension_information.extension_name + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + expected = ExtensionTag(extension_tag) + observed = extension_information.extension_tag + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + expected = ExtensionType(extension_type) + observed = extension_information.extension_type + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_create_with_none(self): + """ + Test that an ExtensionInformation object with no data can be created + using the create class method. + """ + self._test_create(None, None, None) + + def test_create_with_args(self): + """ + Test that an ExtensionInformation object with data can be created + using the create class method. + """ + self._test_create('ACME LOCATION', 5548545, 7) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + ExtensionInformation objects with the same internal data. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_equal_and_empty(self): + """ + Test that the equality operator returns True when comparing two + ExtensionInformation objects with no internal data. + """ + a = ExtensionInformation() + b = ExtensionInformation() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal(self): + """ + Test that the equality operator returns False when comparing two + ExtensionInformation objects with different sets of internal data. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = ExtensionInformation() + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing an + ExtensionInformation object with a non-ExtensionInformation object. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = "invalid" + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + ExtensionInformation objects with the same internal data. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_equal_and_empty(self): + """ + Test that the inequality operator returns False when comparing two + ExtensionInformation objects with no internal data. + """ + a = ExtensionInformation() + b = ExtensionInformation() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal(self): + """ + Test that the inequality operator returns True when comparing two + ExtensionInformation objects with the different sets of internal data. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = ExtensionInformation() + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing an + ExtensionInformation object with a non-ExtensionInformation object. + """ + a = ExtensionInformation( + extension_name=self.extension_name_c, + extension_tag=self.extension_tag_c, + extension_type=self.extension_type_c) + b = "invalid" + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr_with_no_data(self): + """ + Test that the representation of an ExtensionInformation object with no + data is formatted properly and can be used by eval to create a new + ExtensionInformation object identical to the original. + """ + extension_information = ExtensionInformation() + + expected = "ExtensionInformation(" + expected += "extension_name=ExtensionName(value=''), " + expected += "extension_tag=None, " + expected += "extension_type=None)" + observed = repr(extension_information) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + expected = extension_information + observed = eval(repr(extension_information)) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_repr_with_data(self): + """ + Test that the representation of an ExtensionInformation object with + data is formatted properly and can be used by eval to create a new + ExtensionInformation object identical to the original. + """ + extension_information = ExtensionInformation( + extension_name=ExtensionName('ACME LOCATION'), + extension_tag=ExtensionTag(5548545), + extension_type=ExtensionType(7)) + + expected = "ExtensionInformation(" + expected += "extension_name=ExtensionName(value='ACME LOCATION'), " + expected += "extension_tag=ExtensionTag(value=5548545), " + expected += "extension_type=ExtensionType(value=7))" + observed = repr(extension_information) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + expected = extension_information + observed = eval(repr(extension_information)) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_str_with_no_data(self): + """ + Test that the string representation of an ExtensionInformation object + is formatted properly when there is no internal data. + """ + extension_information = ExtensionInformation() + + expected = "ExtensionInformation(" + expected += "extension_name=ExtensionName(value=''), " + expected += "extension_tag=None, " + expected += "extension_type=None)" + observed = str(extension_information) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + + def test_str_with_data(self): + """ + Test that the string representation of an ExtensionInformation object + is formatted properly when there is internal data. + """ + extension_information = ExtensionInformation( + extension_name=ExtensionName('ACME LOCATION'), + extension_tag=ExtensionTag(5548545), + extension_type=ExtensionType(7)) + + expected = "ExtensionInformation(" + expected += "extension_name=ExtensionName(value='ACME LOCATION'), " + expected += "extension_tag=ExtensionTag(value=5548545), " + expected += "extension_type=ExtensionType(value=7))" + observed = str(extension_information) + + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) diff --git a/kmip/tests/core/objects/test_objects.py b/kmip/tests/core/objects/test_objects.py new file mode 100644 index 0000000..63f6ae6 --- /dev/null +++ b/kmip/tests/core/objects/test_objects.py @@ -0,0 +1,168 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from six import string_types +from testtools import TestCase + +from kmip.core.objects import ExtensionName +from kmip.core.objects import ExtensionTag +from kmip.core.objects import ExtensionType + + +class TestExtensionName(TestCase): + """ + A test suite for the ExtensionName class. + + Since ExtensionName is a simple wrapper for the TextString primitive, only + a few tests pertaining to construction are needed. + """ + + def setUp(self): + super(TestExtensionName, self).setUp() + + def tearDown(self): + super(TestExtensionName, self).tearDown() + + def _test_init(self, value): + if (isinstance(value, string_types)) or (value is None): + extension_name = ExtensionName(value) + + if value is None: + value = '' + + msg = "expected {0}, observed {1}".format( + value, extension_name.value) + self.assertEqual(value, extension_name.value, msg) + else: + self.assertRaises(TypeError, ExtensionName, value) + + def test_init_with_none(self): + """ + Test that an ExtensionName object can be constructed with no specified + value. + """ + self._test_init(None) + + def test_init_with_valid(self): + """ + Test that an ExtensionName object can be constructed with a valid + string value. + """ + self._test_init("valid") + + def test_init_with_invalid(self): + """ + Test that a TypeError exception is raised when a non-string value is + used to construct an ExtensionName object. + """ + self._test_init(0) + + +class TestExtensionTag(TestCase): + """ + A test suite for the ExtensionTag class. + + Since ExtensionTag is a simple wrapper for the Integer primitive, only a + few tests pertaining to construction are needed. + """ + + def setUp(self): + super(TestExtensionTag, self).setUp() + + def tearDown(self): + super(TestExtensionTag, self).tearDown() + + def _test_init(self, value): + if (isinstance(value, int)) or (value is None): + extension_tag = ExtensionTag(value) + + if value is None: + value = 0 + + msg = "expected {0}, observed {1}".format( + value, extension_tag.value) + self.assertEqual(value, extension_tag.value, msg) + else: + self.assertRaises(TypeError, ExtensionTag, value) + + def test_init_with_none(self): + """ + Test that an ExtensionTag object can be constructed with no specified + value. + """ + self._test_init(None) + + def test_init_with_valid(self): + """ + Test that an ExtensionTag object can be constructed with a valid + integer value. + """ + self._test_init(0) + + def test_init_with_invalid(self): + """ + Test that a TypeError exception is raised when a non-integer value is + used to construct an ExtensionName object. + """ + self._test_init("invalid") + + +class TestExtensionType(TestCase): + """ + A test suite for the ExtensionType class. + + Since ExtensionType is a simple wrapper for the Integer primitive, only a + few tests pertaining to construction are needed. + """ + + def setUp(self): + super(TestExtensionType, self).setUp() + + def tearDown(self): + super(TestExtensionType, self).tearDown() + + def _test_init(self, value): + if (isinstance(value, int)) or (value is None): + extension_type = ExtensionType(value) + + if value is None: + value = 0 + + msg = "expected {0}, observed {1}".format( + value, extension_type.value) + self.assertEqual(value, extension_type.value, msg) + else: + self.assertRaises(TypeError, ExtensionType, value) + + def test_init_with_none(self): + """ + Test that an ExtensionType object can be constructed with no specified + value. + """ + self._test_init(None) + + def test_init_with_valid(self): + """ + Test that an ExtensionType object can be constructed with a valid + integer value. + """ + self._test_init(0) + + def test_init_with_invalid(self): + """ + Test that a TypeError exception is raised when a non-string value is + used to construct an ExtensionType object. + """ + self._test_init("invalid") diff --git a/kmip/tests/services/test_kmip_client.py b/kmip/tests/services/test_kmip_client.py index 1b9880c..bfae85c 100644 --- a/kmip/tests/services/test_kmip_client.py +++ b/kmip/tests/services/test_kmip_client.py @@ -30,6 +30,7 @@ from kmip.core.enums import CryptographicUsageMask from kmip.core.enums import ObjectType from kmip.core.enums import Operation as OperationEnum from kmip.core.enums import KeyFormatType +from kmip.core.enums import QueryFunction as QueryFunctionEnum from kmip.core.enums import ResultStatus from kmip.core.enums import ResultReason @@ -51,10 +52,15 @@ from kmip.core.messages.payloads.create_key_pair import \ CreateKeyPairRequestPayload, CreateKeyPairResponsePayload from kmip.core.messages.payloads.discover_versions import \ DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload +from kmip.core.messages.payloads.query import \ + QueryRequestPayload, QueryResponsePayload from kmip.core.messages.payloads.rekey_key_pair import \ RekeyKeyPairRequestPayload, RekeyKeyPairResponsePayload from kmip.core.misc import Offset +from kmip.core.misc import QueryFunction +from kmip.core.misc import ServerInformation +from kmip.core.misc import VendorIdentification from kmip.core.objects import Attribute from kmip.core.objects import CommonTemplateAttribute @@ -68,6 +74,7 @@ from kmip.services.kmip_client import KMIPProxy from kmip.services.results import CreateKeyPairResult from kmip.services.results import DiscoverVersionsResult +from kmip.services.results import QueryResult from kmip.services.results import RekeyKeyPairResult import kmip.core.utils as utils @@ -594,6 +601,41 @@ class TestKMIPClient(TestCase): self._test_build_rekey_key_pair_batch_item( None, None, None, None, None) + def _test_build_query_batch_item(self, query_functions): + batch_item = self.client._build_query_batch_item(query_functions) + + base = "expected {0}, received {1}" + msg = base.format(RequestBatchItem, batch_item) + self.assertIsInstance(batch_item, RequestBatchItem, msg) + + operation = batch_item.operation + + msg = base.format(Operation, operation) + self.assertIsInstance(operation, Operation, msg) + + operation_enum = operation.enum + + msg = base.format(OperationEnum.QUERY, operation_enum) + self.assertEqual(OperationEnum.QUERY, operation_enum, msg) + + payload = batch_item.request_payload + + if query_functions is None: + query_functions = list() + + msg = base.format(QueryRequestPayload, payload) + self.assertIsInstance(payload, QueryRequestPayload, msg) + + query_functions_observed = payload.query_functions + self.assertEqual(query_functions, query_functions_observed) + + def test_build_query_batch_item_with_input(self): + self._test_build_query_batch_item( + [QueryFunction(QueryFunctionEnum.QUERY_OBJECTS)]) + + def test_build_query_batch_item_without_input(self): + self._test_build_query_batch_item(None) + def _test_build_discover_versions_batch_item(self, protocol_versions): batch_item = self.client._build_discover_versions_batch_item( protocol_versions) @@ -680,6 +722,10 @@ class TestKMIPClient(TestCase): self.assertRaisesRegexp(ValueError, "no processor for operation", self.client._get_batch_item_processor, None) + def _test_equality(self, expected, observed): + msg = "expected {0}, observed {1}".format(expected, observed) + self.assertEqual(expected, observed, msg) + def test_process_create_key_pair_batch_item(self): batch_item = ResponseBatchItem( operation=Operation(OperationEnum.CREATE_KEY_PAIR), @@ -698,6 +744,64 @@ class TestKMIPClient(TestCase): msg = "expected {0}, received {1}".format(RekeyKeyPairResult, result) self.assertIsInstance(result, RekeyKeyPairResult, msg) + def _test_process_query_batch_item( + self, + operations, + object_types, + vendor_identification, + server_information, + application_namespaces, + extension_information): + + payload = QueryResponsePayload( + operations, + object_types, + vendor_identification, + server_information, + application_namespaces, + extension_information) + batch_item = ResponseBatchItem( + operation=Operation(OperationEnum.QUERY), + response_payload=payload) + + result = self.client._process_query_batch_item(batch_item) + + base = "expected {0}, observed {1}" + msg = base.format(QueryResult, result) + self.assertIsInstance(result, QueryResult, msg) + + # The payload maps the following inputs to empty lists on None. + if operations is None: + operations = list() + if object_types is None: + object_types = list() + if application_namespaces is None: + application_namespaces = list() + if extension_information is None: + extension_information = list() + + self._test_equality(operations, result.operations) + self._test_equality(object_types, result.object_types) + self._test_equality( + vendor_identification, result.vendor_identification) + self._test_equality(server_information, result.server_information) + self._test_equality( + application_namespaces, result.application_namespaces) + self._test_equality( + extension_information, result.extension_information) + + def test_process_query_batch_item_with_results(self): + self._test_process_query_batch_item( + list(), + list(), + VendorIdentification(), + ServerInformation(), + list(), + list()) + + def test_process_query_batch_item_without_results(self): + self._test_process_query_batch_item(None, None, None, None, None, None) + def _test_process_discover_versions_batch_item(self, protocol_versions): batch_item = ResponseBatchItem( operation=Operation(OperationEnum.DISCOVER_VERSIONS),