Adding support for the Query operation

This change adds support for the Query operation, including updates to
the KMIP client and core object libraries, the KMIP client and core unit
test suites, and a Query unit demo.
This commit is contained in:
Peter Hamilton 2015-02-23 17:18:05 -05:00
parent 0c4f9cd9d0
commit 80ee64e600
22 changed files with 2731 additions and 17 deletions

View File

@ -458,6 +458,15 @@ class KeyRoleType(Enum):
PVKOTH = 0x00000015
# 9.1.3.2.24
class QueryFunction(Enum):
QUERY_OPERATIONS = 0x00000001
QUERY_OBJECTS = 0x00000002
QUERY_SERVER_INFORMATION = 0x00000003
QUERY_APPLICATION_NAMESPACES = 0x00000004
QUERY_EXTENSION_LIST = 0x00000005
QUERY_EXTENSION_MAP = 0x00000006
# 9.1.3.2.27
class Operation(Enum):
CREATE = 0x00000001

View File

@ -21,6 +21,7 @@ from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
@ -48,5 +49,8 @@ class RequestPayloadFactory(PayloadFactory):
def _create_destroy_payload(self):
return destroy.DestroyRequestPayload()
def _create_query_payload(self):
return query.QueryRequestPayload()
def _create_discover_versions_payload(self):
return discover_versions.DiscoverVersionsRequestPayload()

View File

@ -21,6 +21,7 @@ from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
@ -48,5 +49,8 @@ class ResponsePayloadFactory(PayloadFactory):
def _create_destroy_payload(self):
return destroy.DestroyResponsePayload()
def _create_query_payload(self):
return query.QueryResponsePayload()
def _create_discover_versions_payload(self):
return discover_versions.DiscoverVersionsResponsePayload()

View File

@ -0,0 +1,339 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves import xrange
from kmip.core.attributes import ApplicationNamespace
from kmip.core.attributes import ObjectType
from kmip.core.enums import Tags
from kmip.core.messages.contents import Operation
from kmip.core.misc import QueryFunction
from kmip.core.misc import ServerInformation
from kmip.core.misc import VendorIdentification
from kmip.core.objects import ExtensionInformation
from kmip.core.primitives import Struct
from kmip.core.utils import BytearrayStream
class QueryRequestPayload(Struct):
"""
A request payload for the Query operation.
The payload contains a list of query functions that the KMIP server should
respond to. See Section 4.25 of the KMIP 1.1 specification for more
information.
Attributes:
query_functions: A list of QueryFunction enumerations.
"""
def __init__(self, query_functions=None):
"""
Construct a QueryRequestPayload object.
Args:
query_functions (list): A list of QueryFunction enumerations.
"""
super(QueryRequestPayload, self).__init__(Tags.REQUEST_PAYLOAD)
if query_functions is None:
self.query_functions = list()
else:
self.query_functions = query_functions
self.validate()
def read(self, istream):
"""
Read the data encoding the QueryRequestPayload object and decode it
into its constituent parts.
Args:
istream (Stream): A data stream containing encoded object data,
supporting a read method; usually a BytearrayStream object.
"""
super(QueryRequestPayload, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
while(self.is_tag_next(Tags.QUERY_FUNCTION, tstream)):
query_function = QueryFunction()
query_function.read(tstream)
self.query_functions.append(query_function)
self.is_oversized(tstream)
self.validate()
def write(self, ostream):
"""
Write the data encoding the QueryRequestPayload object to a stream.
Args:
ostream (Stream): A data stream in which to encode object data,
supporting a write method; usually a BytearrayStream object.
"""
tstream = BytearrayStream()
for query_function in self.query_functions:
query_function.write(tstream)
self.length = tstream.length()
super(QueryRequestPayload, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):
"""
Error check the attributes of the QueryRequestPayload object.
"""
self.__validate()
def __validate(self):
if isinstance(self.query_functions, list):
for i in xrange(len(self.query_functions)):
query_function = self.query_functions[i]
if not isinstance(query_function, QueryFunction):
msg = "invalid query function ({0} in list)".format(i)
msg += "; expected {0}, received {1}".format(
QueryFunction, query_function)
raise TypeError(msg)
else:
msg = "invalid query functions list"
msg += "; expected {0}, received {1}".format(
list, self.query_functions)
raise TypeError(msg)
class QueryResponsePayload(Struct):
"""
A response payload for the Query operation.
The payload contains different sets of responses that the KMIP server
provides in response to the initial Query request. See Section 4.25 of the
KMIP 1.1 specification for more information.
Attributes:
operations: A list of Operations supported by the server.
object_types: A list of ObjectTypes supported by the server.
vendor_identification: A string identifying the server vendor.
server_information: A structure containing vendor-specific fields and
substructures.
application_namespaces: A list of application namespaces supported by
the server.
extension_information: A list of ExtensionInformation objects detailing
Objects supported by the server with ItemTag values in the
Extensions range.
"""
def __init__(self, operations=None, object_types=None,
vendor_identification=None, server_information=None,
application_namespaces=None, extension_information=None):
"""
Construct a QueryResponsePayload object.
Args:
operations (list): A list of Operations supported by the server.
object_types (list): A list of ObjectTypes supported by the server.
vendor_identification (VendorIdentification): A string identifying
the server vendor.
server_information (ServerInformation): A structure containing
vendor-specific fields and substructures.
application_namespaces (list): A list of application namespaces
supported by the server.
extension_information (list): A list of ExtensionInformation
objects detailing Objects supported by the server with ItemTag
values in the Extensions range.
"""
super(QueryResponsePayload, self).__init__(Tags.RESPONSE_PAYLOAD)
if operations is None:
self.operations = list()
else:
self.operations = operations
if object_types is None:
self.object_types = list()
else:
self.object_types = object_types
self.vendor_identification = vendor_identification
self.server_information = server_information
if application_namespaces is None:
self.application_namespaces = list()
else:
self.application_namespaces = application_namespaces
if extension_information is None:
self.extension_information = list()
else:
self.extension_information = extension_information
self.validate()
def read(self, istream):
"""
Read the data encoding the QueryResponsePayload object and decode it
into its constituent parts.
Args:
istream (Stream): A data stream containing encoded object data,
supporting a read method; usually a BytearrayStream object.
"""
super(QueryResponsePayload, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
while(self.is_tag_next(Tags.OPERATION, tstream)):
operation = Operation()
operation.read(tstream)
self.operations.append(operation)
while(self.is_tag_next(Tags.OBJECT_TYPE, tstream)):
object_type = ObjectType()
object_type.read(tstream)
self.object_types.append(object_type)
if self.is_tag_next(Tags.VENDOR_IDENTIFICATION, tstream):
self.vendor_identification = VendorIdentification()
self.vendor_identification.read(tstream)
if self.is_tag_next(Tags.SERVER_INFORMATION, tstream):
self.server_information = ServerInformation()
self.server_information.read(tstream)
while(self.is_tag_next(Tags.APPLICATION_NAMESPACE, tstream)):
application_namespace = ApplicationNamespace()
application_namespace.read(tstream)
self.application_namespaces.append(application_namespace)
while(self.is_tag_next(Tags.EXTENSION_INFORMATION, tstream)):
extension_information = ExtensionInformation()
extension_information.read(tstream)
self.extension_information.append(extension_information)
self.is_oversized(tstream)
self.validate()
def write(self, ostream):
"""
Write the data encoding the QueryResponsePayload object to a stream.
Args:
ostream (Stream): A data stream in which to encode object data,
supporting a write method; usually a BytearrayStream object.
"""
tstream = BytearrayStream()
for operation in self.operations:
operation.write(tstream)
for object_type in self.object_types:
object_type.write(tstream)
if self.vendor_identification is not None:
self.vendor_identification.write(tstream)
if self.server_information is not None:
self.server_information.write(tstream)
for application_namespace in self.application_namespaces:
application_namespace.write(tstream)
for extension_information in self.extension_information:
extension_information.write(tstream)
self.length = tstream.length()
super(QueryResponsePayload, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):
"""
Error check the attributes of the QueryRequestPayload object.
"""
self.__validate()
def __validate(self):
# TODO (peter-hamilton) Add separate validate_list function for this
if isinstance(self.operations, list):
for i in xrange(len(self.operations)):
operation = self.operations[i]
if not isinstance(operation, Operation):
msg = "invalid operation ({0} in list)".format(i)
msg += "; expected {0}, received {1}".format(
Operation, operation)
raise TypeError(msg)
else:
msg = "invalid operations list"
msg += "; expected {0}, received {1}".format(
list, self.operations)
raise TypeError(msg)
if isinstance(self.object_types, list):
for i in xrange(len(self.object_types)):
object_type = self.object_types[i]
if not isinstance(object_type, ObjectType):
msg = "invalid object type ({0} in list)".format(i)
msg += "; expected {0}, received {1}".format(
ObjectType, object_type)
raise TypeError(msg)
else:
msg = "invalid object types list"
msg += "; expected {0}, received {1}".format(
list, self.object_types)
raise TypeError(msg)
if self.vendor_identification is not None:
if not isinstance(self.vendor_identification,
VendorIdentification):
msg = "invalid vendor identification"
msg += "; expected {0}, received {1}".format(
VendorIdentification, self.vendor_identification)
raise TypeError(msg)
if self.server_information is not None:
if not isinstance(self.server_information, ServerInformation):
msg = "invalid server information"
msg += "; expected {0}, received {1}".format(
ServerInformation, self.server_information)
raise TypeError(msg)
if isinstance(self.application_namespaces, list):
for i in xrange(len(self.application_namespaces)):
application_namespace = self.application_namespaces[i]
if not isinstance(application_namespace, ApplicationNamespace):
msg = "invalid application namespace ({0} in list)".format(
i)
msg += "; expected {0}, received {1}".format(
ApplicationNamespace, application_namespace)
raise TypeError(msg)
else:
msg = "invalid application namespaces list"
msg += "; expected {0}, received {1}".format(
list, self.application_namespaces)
raise TypeError(msg)
if isinstance(self.extension_information, list):
for i in xrange(len(self.extension_information)):
extension_information = self.extension_information[i]
if not isinstance(extension_information, ExtensionInformation):
msg = "invalid extension information ({0} in list)".format(
i)
msg += "; expected {0}, received {1}".format(
ExtensionInformation, extension_information)
raise TypeError(msg)
else:
msg = "invalid extension information list"
msg += "; expected {0}, received {1}".format(
list, self.extension_information)
raise TypeError(msg)

View File

@ -13,11 +13,172 @@
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core import enums
from kmip.core import primitives
from kmip.core.enums import Tags
from kmip.core.enums import QueryFunction as QueryFunctionEnum
from kmip.core.primitives import Enumeration
from kmip.core.primitives import Interval
from kmip.core.primitives import Struct
from kmip.core.primitives import TextString
from kmip.core.utils import BytearrayStream
class Offset(primitives.Interval):
class Offset(Interval):
"""
An integer representing a positive change in time.
def __init__(self, value=None, tag=enums.Tags.OFFSET):
super(Offset, self).__init__(value, tag)
Used by Rekey and Recertify requests to indicate the time difference
between the InitializationDate and the ActivationDate of the replacement
item to be created. See Sections 4.4, 4.5, and 4.8 of the KMIP v1.1
specification for more information.
"""
def __init__(self, value=None):
"""
Construct an Offset object.
Args:
value (int): An integer representing a positive change in time.
Optional, defaults to None.
"""
super(Offset, self).__init__(value, Tags.OFFSET)
class QueryFunction(Enumeration):
"""
An encodeable wrapper for the QueryFunction enumeration.
Used by Query requests to specify the information to retrieve from the
KMIP server. See Sections 4.25 and 9.1.3.2.24 of the KMIP v1.1
specification for more information.
"""
ENUM_TYPE = QueryFunctionEnum
def __init__(self, value=None):
"""
Construct a QueryFunction object.
Args:
value (QueryFunction enum): A QueryFunction enumeration value,
(e.g., QueryFunction.QUERY_OPERATIONS). Optional, default to
None.
"""
super(QueryFunction, self).__init__(value, Tags.QUERY_FUNCTION)
class VendorIdentification(TextString):
"""
A text string uniquely identifying a KMIP vendor.
Returned by KMIP servers upon receipt of a Query request for server
information. See Section 4.25 of the KMIP v1.1. specification for more
information.
"""
def __init__(self, value=None):
"""
Construct a VendorIdentification object.
Args:
value (str): A string describing a KMIP vendor. Optional, defaults
to None.
"""
super(VendorIdentification, self).__init__(
value, Tags.VENDOR_IDENTIFICATION)
class ServerInformation(Struct):
"""
A structure containing vendor-specific fields and/or substructures.
Returned by KMIP servers upon receipt of a Query request for server
information. See Section 4.25 of the KMIP v1.1 specification for more
information.
Note:
There are no example structures nor data encodings in the KMIP
documentation of this object. Therefore this class handles encoding and
decoding its data in a generic way, using a BytearrayStream for primary
storage. The intent is for vendor-specific subclasses to decide how to
decode this data from the stream attribute. Likewise, these subclasses
must decide how to encode their data into the stream attribute. There
are no arguments to the constructor and therefore no means by which to
validate the object's contents.
"""
def __init__(self):
"""
Construct a ServerInformation object.
"""
super(ServerInformation, self).__init__(Tags.SERVER_INFORMATION)
self.data = BytearrayStream()
self.validate()
def read(self, istream):
"""
Read the data encoding the ServerInformation object and decode it into
its constituent parts.
Args:
istream (Stream): A data stream containing encoded object data,
supporting a read method; usually a BytearrayStream object.
"""
super(ServerInformation, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
self.data = BytearrayStream(tstream.read())
self.is_oversized(tstream)
self.validate()
def write(self, ostream):
"""
Write the data encoding the ServerInformation object to a stream.
Args:
ostream (Stream): A data stream in which to encode object data,
supporting a write method; usually a BytearrayStream object.
"""
tstream = BytearrayStream()
tstream.write(self.data.buffer)
self.length = tstream.length()
super(ServerInformation, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):
"""
Error check the types of the different parts of the ServerInformation
object.
"""
self.__validate()
def __validate(self):
# NOTE (peter-hamilton): Intentional pass, no way to validate data.
pass
def __eq__(self, other):
if isinstance(other, ServerInformation):
if len(self.data) != len(other.data):
return False
elif self.data != other.data:
return False
else:
return True
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, ServerInformation):
return not (self == other)
else:
return NotImplemented
def __repr__(self):
return "ServerInformation()"
def __str__(self):
return str(self.data)

View File

@ -921,3 +921,244 @@ class PublicKeyTemplateAttribute(TemplateAttribute):
attributes=None):
super(PublicKeyTemplateAttribute, self).__init__(
names, attributes, Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE)
# 2.1.9
class ExtensionName(TextString):
"""
The name of an extended Object.
A part of ExtensionInformation, specifically identifying an Object that is
a custom vendor addition to the KMIP specification. See Section 2.1.9 of
the KMIP 1.1 specification for more information.
Attributes:
value: The string data representing the extension name.
"""
def __init__(self, value=''):
"""
Construct an ExtensionName object.
Args:
value (str): The string data representing the extension name.
Optional, defaults to the empty string.
"""
super(ExtensionName, self).__init__(value, Tags.EXTENSION_NAME)
class ExtensionTag(Integer):
"""
The tag of an extended Object.
A part of ExtensionInformation. See Section 2.1.9 of the KMIP 1.1
specification for more information.
Attributes:
value: The tag number identifying the extended object.
"""
def __init__(self, value=0):
"""
Construct an ExtensionTag object.
Args:
value (int): A number representing the extension tag. Often
displayed in hex format. Optional, defaults to 0.
"""
super(ExtensionTag, self).__init__(value, Tags.EXTENSION_TAG)
class ExtensionType(Integer):
"""
The type of an extended Object.
A part of ExtensionInformation, specifically identifying the type of the
Object in the specification extension. See Section 2.1.9 of the KMIP 1.1
specification for more information.
Attributes:
value: The type enumeration for the extended object.
"""
def __init__(self, value=None):
"""
Construct an ExtensionType object.
Args:
value (Types): A number representing a Types enumeration value,
indicating the type of the extended Object. Optional, defaults
to None.
"""
super(ExtensionType, self).__init__(value, Tags.EXTENSION_TYPE)
class ExtensionInformation(Struct):
"""
A structure describing Objects defined in KMIP specification extensions.
It is used specifically for Objects with Item Tag values in the Extensions
range and appears in responses to Query requests for server extension
information. See Sections 2.1.9 and 4.25 of the KMIP 1.1 specification for
more information.
Attributes:
extension_name: The name of the extended Object.
extension_tag: The tag of the extended Object.
extension_type: The type of the extended Object.
"""
def __init__(self, extension_name=None, extension_tag=None,
extension_type=None):
"""
Construct an ExtensionInformation object.
Args:
extension_name (ExtensionName): The name of the extended Object.
extension_tag (ExtensionTag): The tag of the extended Object.
extension_type (ExtensionType): The type of the extended Object.
"""
super(ExtensionInformation, self).__init__(Tags.EXTENSION_INFORMATION)
if extension_name is None:
self.extension_name = ExtensionName()
else:
self.extension_name = extension_name
self.extension_tag = extension_tag
self.extension_type = extension_type
self.validate()
def read(self, istream):
"""
Read the data encoding the ExtensionInformation object and decode it
into its constituent parts.
Args:
istream (Stream): A data stream containing encoded object data,
supporting a read method; usually a BytearrayStream object.
"""
super(ExtensionInformation, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
self.extension_name.read(tstream)
if self.is_tag_next(Tags.EXTENSION_TAG, tstream):
self.extension_tag = ExtensionTag()
self.extension_tag.read(tstream)
if self.is_tag_next(Tags.EXTENSION_TYPE, tstream):
self.extension_type = ExtensionType()
self.extension_type.read(tstream)
self.is_oversized(tstream)
self.validate()
def write(self, ostream):
"""
Write the data encoding the ExtensionInformation object to a stream.
Args:
ostream (Stream): A data stream in which to encode object data,
supporting a write method; usually a BytearrayStream object.
"""
tstream = BytearrayStream()
self.extension_name.write(tstream)
if self.extension_tag is not None:
self.extension_tag.write(tstream)
if self.extension_type is not None:
self.extension_type.write(tstream)
self.length = tstream.length()
super(ExtensionInformation, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):
"""
Error check the attributes of the ExtensionInformation object.
"""
self.__validate()
def __validate(self):
if not isinstance(self.extension_name, ExtensionName):
msg = "invalid extension name"
msg += "; expected {0}, received {1}".format(
ExtensionName, self.extension_name)
raise TypeError(msg)
if self.extension_tag is not None:
if not isinstance(self.extension_tag, ExtensionTag):
msg = "invalid extension tag"
msg += "; expected {0}, received {1}".format(
ExtensionTag, self.extension_tag)
raise TypeError(msg)
if self.extension_type is not None:
if not isinstance(self.extension_type, ExtensionType):
msg = "invalid extension type"
msg += "; expected {0}, received {1}".format(
ExtensionType, self.extension_type)
raise TypeError(msg)
def __eq__(self, other):
if isinstance(other, ExtensionInformation):
if self.extension_name != other.extension_name:
return False
elif self.extension_tag != other.extension_tag:
return False
elif self.extension_type != other.extension_type:
return False
else:
return True
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, ExtensionInformation):
return not (self == other)
else:
return NotImplemented
def __repr__(self):
name = "extension_name={0}".format(repr(self.extension_name))
tag = "extension_tag={0}".format(repr(self.extension_tag))
typ = "extension_type={0}".format(repr(self.extension_type))
return "ExtensionInformation({0}, {1}, {2})".format(name, tag, typ)
def __str__(self):
return repr(self)
@classmethod
def create(cls, extension_name=None, extension_tag=None,
extension_type=None):
"""
Construct an ExtensionInformation object from provided extension
values.
Args:
extension_name (str): The name of the extension. Optional,
defaults to None.
extension_tag (int): The tag number of the extension. Optional,
defaults to None.
extension_type (int): The type index of the extension. Optional,
defaults to None.
Returns:
ExtensionInformation: The newly created set of extension
information.
Example:
>>> x = ExtensionInformation.create('extension', 1, 1)
>>> x.extension_name.value
ExtensionName(value='extension')
>>> x.extension_tag.value
ExtensionTag(value=1)
>>> x.extension_type.value
ExtensionType(value=1)
"""
extension_name = ExtensionName(extension_name)
extension_tag = ExtensionTag(extension_tag)
extension_type = ExtensionType(extension_type)
return ExtensionInformation(
extension_name=extension_name,
extension_tag=extension_tag,
extension_type=extension_type)

View File

@ -156,8 +156,9 @@ class Struct(Base):
def __init__(self, tag=Tags.DEFAULT):
super(Struct, self).__init__(tag, type=Types.STRUCTURE)
# NOTE (peter-hamilton) If seen, should indicate repr needs to be defined
def __repr__(self):
return '<Struct>'
return "Struct()"
class Integer(Base):
@ -216,7 +217,10 @@ class Integer(Base):
num_bytes)
def __repr__(self):
return '<Integer, %d>' % (self.value)
return "{0}(value={1})".format(type(self).__name__, repr(self.value))
def __str__(self):
return "{0}".format(repr(self.value))
def __eq__(self, other):
if isinstance(other, Integer):
@ -413,7 +417,11 @@ class Enumeration(Integer):
Enum, type(self.enum)))
def __repr__(self):
return '<Enumeration, %s, %d>' % (self.enum.name, self.enum.value)
return "Enumeration(value={0})".format(self.enum)
def __str__(self):
return "{0} - {1} - {2}".format(
type(self.enum), self.enum.name, self.enum.value)
class Boolean(Base):
@ -550,7 +558,10 @@ class TextString(Base):
data_type))
def __repr__(self):
return '<TextString, %s>' % (self.value)
return "{0}(value={1})".format(type(self).__name__, repr(self.value))
def __str__(self):
return "{0}".format(repr(self.value))
def __eq__(self, other):
if isinstance(other, TextString):

View File

@ -66,7 +66,7 @@ def build_er_error(class_object, descriptor, expected, received,
class BytearrayStream(io.RawIOBase):
def __init__(self, data=None):
if data is None:
self.buffer = b''
self.buffer = bytes()
else:
self.buffer = bytes(data)
@ -81,10 +81,11 @@ class BytearrayStream(io.RawIOBase):
return data
def readall(self):
data = self.buffer[0:]
self.buffer = self.buffer[len(self.buffer):]
data = self.buffer
self.buffer = bytes()
return data
# TODO (peter-hamilton) Unused, add documentation or cut.
def readinto(self, b):
if len(b) <= len(self.buffer):
num_bytes_to_read = len(b)
@ -117,6 +118,17 @@ class BytearrayStream(io.RawIOBase):
def __eq__(self, other):
if isinstance(other, BytearrayStream):
return (self.buffer == other.buffer)
if len(self.buffer) != len(other.buffer):
return False
elif self.buffer != other.buffer:
return False
else:
return True
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, BytearrayStream):
return not (self == other)
else:
return NotImplemented

110
kmip/demos/units/query.py Normal file
View File

@ -0,0 +1,110 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import sys
from six.moves import xrange
from kmip.core.enums import Operation
from kmip.core.enums import QueryFunction as QueryFunctionEnum
from kmip.core.enums import ResultStatus
from kmip.core.misc import QueryFunction
from kmip.demos import utils
from kmip.services.kmip_client import KMIPProxy
if __name__ == '__main__':
# Build and parse arguments
parser = utils.build_cli_parser(Operation.QUERY)
opts, args = parser.parse_args(sys.argv[1:])
username = opts.username
password = opts.password
# Build and setup logging and needed factories
f_log = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,
'logconfig.ini')
logging.config.fileConfig(f_log)
logger = logging.getLogger(__name__)
# Build query function list.
query_functions = list()
query_functions.append(
QueryFunction(QueryFunctionEnum.QUERY_OPERATIONS))
query_functions.append(
QueryFunction(QueryFunctionEnum.QUERY_OBJECTS))
query_functions.append(
QueryFunction(QueryFunctionEnum.QUERY_SERVER_INFORMATION))
query_functions.append(
QueryFunction(QueryFunctionEnum.QUERY_APPLICATION_NAMESPACES))
query_functions.append(
QueryFunction(QueryFunctionEnum.QUERY_EXTENSION_LIST))
query_functions.append(
QueryFunction(QueryFunctionEnum.QUERY_EXTENSION_MAP))
# Build the client and connect to the server
client = KMIPProxy()
client.open()
result = client.query(query_functions=query_functions)
client.close()
# Display operation results
logger.info('query() result status: {0}'.format(
result.result_status.enum))
if result.result_status.enum == ResultStatus.SUCCESS:
operations = result.operations
object_types = result.object_types
vendor_identification = result.vendor_identification
server_information = result.server_information
application_namespaces = result.application_namespaces
extension_information = result.extension_information
logger.info('number of operations supported: {0}'.format(
len(operations)))
for i in xrange(len(operations)):
logger.info('operation supported: {0}'.format(operations[i]))
logger.info('number of object types supported: {0}'.format(
len(object_types)))
for i in xrange(len(object_types)):
logger.info('object type supported: {0}'.format(object_types[i]))
logger.info('vendor identification: {0}'.format(vendor_identification))
logger.info('server information: {0}'.format(server_information))
logger.info('number of application namespaces supported: {0}'.format(
len(application_namespaces)))
for i in xrange(len(application_namespaces)):
logger.info('application namespace supported: {0}'.format(
application_namespaces[i]))
logger.info('number of extensions supported: {0}'.format(
len(extension_information)))
for i in xrange(len(extension_information)):
logger.info('extension supported: {0}'.format(
extension_information[i]))
else:
logger.info('query() result reason: {0}'.format(
result.result_reason.enum))
logger.info('query() result message: {0}'.format(
result.result_message.value))

View File

@ -119,6 +119,8 @@ def build_cli_parser(operation):
default=None,
dest="length",
help="Key length in bits (e.g., 128, 256)")
elif operation is Operation.QUERY:
pass
elif operation is Operation.DISCOVER_VERSIONS:
pass
else:

View File

@ -19,6 +19,7 @@ from kmip.services.results import DestroyResult
from kmip.services.results import DiscoverVersionsResult
from kmip.services.results import GetResult
from kmip.services.results import LocateResult
from kmip.services.results import QueryResult
from kmip.services.results import RegisterResult
from kmip.services.results import RekeyKeyPairResult
@ -45,6 +46,7 @@ from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
@ -176,6 +178,31 @@ class KMIPProxy(KMIP):
object_group_member=object_group_member,
attributes=attributes, credential=credential)
def query(self, batch=False, query_functions=None, credential=None):
"""
Send a Query request to the server.
Args:
batch (boolean): A flag indicating if the operation should be sent
with a batch of additional operations. Defaults to False.
query_functions (list): A list of QueryFunction enumerations
indicating what information the client wants from the server.
Optional, defaults to None.
credential (Credential): A Credential object containing
authentication information for the server. Optional, defaults
to None.
"""
batch_item = self._build_query_batch_item(query_functions)
# TODO (peter-hamilton): Replace this with official client batch mode.
if batch:
self.batch_items.append(batch_item)
else:
request = self._build_request_message(credential, [batch_item])
response = self._send_and_receive_message(request)
results = self._process_batch_items(response)
return results[0]
def discover_versions(self, batch=False, protocol_versions=None,
credential=None):
batch_item = self._build_discover_versions_batch_item(
@ -257,6 +284,13 @@ class KMIPProxy(KMIP):
operation=operation, request_payload=payload)
return batch_item
def _build_query_batch_item(self, query_functions=None):
operation = Operation(OperationEnum.QUERY)
payload = query.QueryRequestPayload(query_functions)
batch_item = messages.RequestBatchItem(
operation=operation, request_payload=payload)
return batch_item
def _build_discover_versions_batch_item(self, protocol_versions=None):
operation = Operation(OperationEnum.DISCOVER_VERSIONS)
@ -281,6 +315,8 @@ class KMIPProxy(KMIP):
return self._process_create_key_pair_batch_item
elif operation == OperationEnum.REKEY_KEY_PAIR:
return self._process_rekey_key_pair_batch_item
elif operation == OperationEnum.QUERY:
return self._process_query_batch_item
elif operation == OperationEnum.DISCOVER_VERSIONS:
return self._process_discover_versions_batch_item
else:
@ -317,6 +353,35 @@ class KMIPProxy(KMIP):
return self._process_key_pair_batch_item(
batch_item, RekeyKeyPairResult)
def _process_query_batch_item(self, batch_item):
payload = batch_item.response_payload
operations = None
object_types = None
vendor_identification = None
server_information = None
application_namespaces = None
extension_information = None
if payload is not None:
operations = payload.operations
object_types = payload.object_types
vendor_identification = payload.vendor_identification
server_information = payload.server_information
application_namespaces = payload.application_namespaces
extension_information = payload.extension_information
return QueryResult(
batch_item.result_status,
batch_item.result_reason,
batch_item.result_message,
operations,
object_types,
vendor_identification,
server_information,
application_namespaces,
extension_information)
def _process_discover_versions_batch_item(self, batch_item):
payload = batch_item.response_payload

View File

@ -174,6 +174,60 @@ class LocateResult(OperationResult):
self.uuids = uuids
class QueryResult(OperationResult):
"""
A container for the results of a Query operation.
Attributes:
result_status: The status of the Query operation (e.g., success or
failure).
result_reason: The reason for the operation status.
result_message: Extra information pertaining to the status reason.
operations: A list of Operations supported by the server.
object_types: A list of Object Types supported by the server.
vendor_identification:
server_information:
application_namespaces: A list of namespaces supported by the server.
extension_information: A list of extensions supported by the server.
"""
def __init__(self,
result_status,
result_reason=None,
result_message=None,
operations=None,
object_types=None,
vendor_identification=None,
server_information=None,
application_namespaces=None,
extension_information=None):
super(QueryResult, self).__init__(
result_status, result_reason, result_message)
if operations is None:
self.operations = list()
else:
self.operations = operations
if object_types is None:
self.object_types = list()
else:
self.object_types = object_types
self.vendor_identification = vendor_identification
self.server_information = server_information
if application_namespaces is None:
self.application_namespaces = list()
else:
self.application_namespaces = application_namespaces
if extension_information is None:
self.extension_information = list()
else:
self.extension_information = extension_information
class DiscoverVersionsResult(OperationResult):
def __init__(self,

View File

@ -24,6 +24,7 @@ from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
@ -138,8 +139,8 @@ class TestRequestPayloadFactory(testtools.TestCase):
self.factory.create, Operation.VALIDATE)
def test_create_query_payload(self):
self._test_not_implemented(
self.factory.create, Operation.QUERY)
payload = self.factory.create(Operation.QUERY)
self._test_payload_type(payload, query.QueryRequestPayload)
def test_create_cancel_payload(self):
self._test_not_implemented(

View File

@ -24,6 +24,7 @@ from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
@ -138,8 +139,8 @@ class TestResponsePayloadFactory(testtools.TestCase):
self.factory.create, Operation.VALIDATE)
def test_create_query_payload(self):
self._test_not_implemented(
self.factory.create, Operation.QUERY)
payload = self.factory.create(Operation.QUERY)
self._test_payload_type(payload, query.QueryResponsePayload)
def test_create_cancel_payload(self):
self._test_not_implemented(

View File

@ -0,0 +1,587 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves import xrange
from testtools import TestCase
from kmip.core import utils
from kmip.core.attributes import ObjectType
from kmip.core.enums import ObjectType as ObjectTypeEnum
from kmip.core.enums import Operation as OperationEnum
from kmip.core.enums import QueryFunction as QueryFunctionEnum
from kmip.core.messages.contents import Operation
from kmip.core.messages.payloads import query
from kmip.core.misc import QueryFunction
from kmip.core.misc import VendorIdentification
from kmip.core.misc import ServerInformation
from kmip.core.objects import ExtensionInformation
from kmip.core.objects import ExtensionName
class TestQueryRequestPayload(TestCase):
"""
Test suite for the QueryRequestPayload class.
Test encodings obtained from Sections 12.1 and 12.2 of the KMIP 1.1 Test
Cases documentation.
"""
def setUp(self):
super(TestQueryRequestPayload, self).setUp()
self.query_functions_a = list()
self.query_functions_b = list()
self.query_functions_c = list()
self.query_functions_b.append(QueryFunction(
QueryFunctionEnum.QUERY_OPERATIONS))
self.query_functions_b.append(QueryFunction(
QueryFunctionEnum.QUERY_OBJECTS))
self.query_functions_b.append(QueryFunction(
QueryFunctionEnum.QUERY_SERVER_INFORMATION))
self.query_functions_c.append(QueryFunction(
QueryFunctionEnum.QUERY_EXTENSION_LIST))
self.encoding_a = utils.BytearrayStream((
b'\x42\x00\x79\x01\x00\x00\x00\x00'))
self.encoding_b = utils.BytearrayStream((
b'\x42\x00\x79\x01\x00\x00\x00\x30\x42\x00\x74\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x74\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x74\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x03\x00\x00\x00\x00'))
self.encoding_c = utils.BytearrayStream((
b'\x42\x00\x79\x01\x00\x00\x00\x10\x42\x00\x74\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x05\x00\x00\x00\x00'))
def tearDown(self):
super(TestQueryRequestPayload, self).tearDown()
def test_init_with_none(self):
"""
Test that a QueryRequestPayload object can be constructed with no
specified value.
"""
query.QueryRequestPayload()
def test_init_with_args(self):
"""
Test that a QueryRequestPayload object can be constructed with valid
values.
"""
query.QueryRequestPayload(self.query_functions_a)
query.QueryRequestPayload(self.query_functions_b)
query.QueryRequestPayload(self.query_functions_c)
def test_validate_with_invalid_query_functions_list(self):
"""
Test that a TypeError exception is raised when an invalid QueryFunction
list is used to construct a QueryRequestPayload object.
"""
kwargs = {'query_functions': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid query functions list",
query.QueryRequestPayload, **kwargs)
def test_validate_with_invalid_query_functions_item(self):
"""
Test that a TypeError exception is raised when an invalid QueryFunction
item is used to construct a QueryRequestPayload object.
"""
kwargs = {'query_functions': ['invalid']}
self.assertRaisesRegexp(
TypeError, "invalid query function",
query.QueryRequestPayload, **kwargs)
def _test_read(self, stream, query_functions):
payload = query.QueryRequestPayload()
payload.read(stream)
expected = len(query_functions)
observed = len(payload.query_functions)
msg = "query functions list decoding mismatch"
msg += "; expected {0} results, received {1}".format(
expected, observed)
self.assertEqual(expected, observed, msg)
for i in xrange(len(query_functions)):
expected = query_functions[i]
observed = payload.query_functions[i]
msg = "query function decoding mismatch"
msg += "; expected {0}, received {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
def test_read_with_empty_query_functions_list(self):
"""
Test that a QueryRequestPayload object with no data can be read from
a data stream.
"""
self._test_read(self.encoding_a, self.query_functions_a)
def test_read_with_multiple_query_functions(self):
"""
Test that a QueryRequestPayload object with multiple pieces of data
can be read from a data stream.
"""
self._test_read(self.encoding_b, self.query_functions_b)
def test_read_with_one_query_function(self):
"""
Test that a QueryRequestPayload object with a single piece of data can
be read from a data stream.
"""
self._test_read(self.encoding_c, self.query_functions_c)
def _test_write(self, encoding, query_functions):
stream = utils.BytearrayStream()
payload = query.QueryRequestPayload(query_functions)
payload.write(stream)
length_expected = len(encoding)
length_received = len(stream)
msg = "encoding lengths not equal"
msg += "; expected {0}, received {1}".format(
length_expected, length_received)
self.assertEqual(length_expected, length_received, msg)
msg = "encoding mismatch"
msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(encoding, stream)
self.assertEqual(encoding, stream, msg)
def test_write_with_empty_query_functions_list(self):
"""
Test that a QueryRequestPayload object with no data can be written to
a data stream.
"""
self._test_write(self.encoding_a, self.query_functions_a)
def test_write_with_multiple_query_functions(self):
"""
Test that a QueryRequestPayload object with multiple pieces of data
can be written to a data stream.
"""
self._test_write(self.encoding_b, self.query_functions_b)
def test_write_with_one_query_function(self):
"""
Test that a QueryRequestPayload object with a single piece of data can
be written to a data stream.
"""
self._test_write(self.encoding_c, self.query_functions_c)
class TestQueryResponsePayload(TestCase):
"""
Test encodings obtained from Sections 12.1 and 12.2 of the KMIP 1.1 Test
Cases documentation.
"""
def setUp(self):
super(TestQueryResponsePayload, self).setUp()
self.operations = list()
self.object_types = list()
self.application_namespaces = list()
self.extension_information = list()
self.vendor_identification = VendorIdentification(
"IBM test server, not-TKLM 2.0.1.1 KMIP 2.0.0.1")
self.server_information = ServerInformation()
self.operations.append(Operation(OperationEnum.CREATE))
self.operations.append(Operation(OperationEnum.CREATE_KEY_PAIR))
self.operations.append(Operation(OperationEnum.REGISTER))
self.operations.append(Operation(OperationEnum.REKEY))
self.operations.append(Operation(OperationEnum.CERTIFY))
self.operations.append(Operation(OperationEnum.RECERTIFY))
self.operations.append(Operation(OperationEnum.LOCATE))
self.operations.append(Operation(OperationEnum.CHECK))
self.operations.append(Operation(OperationEnum.GET))
self.operations.append(Operation(OperationEnum.GET_ATTRIBUTES))
self.operations.append(Operation(OperationEnum.GET_ATTRIBUTE_LIST))
self.operations.append(Operation(OperationEnum.ADD_ATTRIBUTE))
self.operations.append(Operation(OperationEnum.MODIFY_ATTRIBUTE))
self.operations.append(Operation(OperationEnum.DELETE_ATTRIBUTE))
self.operations.append(Operation(OperationEnum.OBTAIN_LEASE))
self.operations.append(Operation(OperationEnum.GET_USAGE_ALLOCATION))
self.operations.append(Operation(OperationEnum.ACTIVATE))
self.operations.append(Operation(OperationEnum.REVOKE))
self.operations.append(Operation(OperationEnum.DESTROY))
self.operations.append(Operation(OperationEnum.ARCHIVE))
self.operations.append(Operation(OperationEnum.RECOVER))
self.operations.append(Operation(OperationEnum.QUERY))
self.operations.append(Operation(OperationEnum.CANCEL))
self.operations.append(Operation(OperationEnum.POLL))
self.operations.append(Operation(OperationEnum.REKEY_KEY_PAIR))
self.operations.append(Operation(OperationEnum.DISCOVER_VERSIONS))
self.object_types.append(ObjectType(ObjectTypeEnum.CERTIFICATE))
self.object_types.append(ObjectType(ObjectTypeEnum.SYMMETRIC_KEY))
self.object_types.append(ObjectType(ObjectTypeEnum.PUBLIC_KEY))
self.object_types.append(ObjectType(ObjectTypeEnum.PRIVATE_KEY))
self.object_types.append(ObjectType(ObjectTypeEnum.TEMPLATE))
self.object_types.append(ObjectType(ObjectTypeEnum.SECRET_DATA))
self.extension_information.append(ExtensionInformation(
extension_name=ExtensionName("ACME LOCATION")))
self.extension_information.append(ExtensionInformation(
extension_name=ExtensionName("ACME ZIP CODE")))
self.encoding_a = utils.BytearrayStream((
b'\x42\x00\x7C\x01\x00\x00\x00\x00'))
self.encoding_b = utils.BytearrayStream((
b'\x42\x00\x7C\x01\x00\x00\x02\x40\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x03\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x04\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x06\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x07\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x08\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x09\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x0A\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x0B\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x0C\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x0D\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x0E\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x0F\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x10\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x11\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x12\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x13\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x14\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x15\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x16\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x18\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x19\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x1A\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x1D\x00\x00\x00\x00\x42\x00\x5C\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x1E\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x01\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x02\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x03\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x04\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x06\x00\x00\x00\x00\x42\x00\x57\x05\x00\x00\x00\x04'
b'\x00\x00\x00\x07\x00\x00\x00\x00\x42\x00\x9D\x07\x00\x00\x00\x2E'
b'\x49\x42\x4D\x20\x74\x65\x73\x74\x20\x73\x65\x72\x76\x65\x72\x2C'
b'\x20\x6E\x6F\x74\x2D\x54\x4B\x4C\x4D\x20\x32\x2E\x30\x2E\x31\x2E'
b'\x31\x20\x4B\x4D\x49\x50\x20\x32\x2E\x30\x2E\x30\x2E\x31\x00\x00'
b'\x42\x00\x88\x01\x00\x00\x00\x00'))
self.encoding_c = utils.BytearrayStream((
b'\x42\x00\x7C\x01\x00\x00\x00\x40\x42\x00\xA4\x01\x00\x00\x00\x18'
b'\x42\x00\xA5\x07\x00\x00\x00\x0D\x41\x43\x4D\x45\x20\x4C\x4F\x43'
b'\x41\x54\x49\x4F\x4E\x00\x00\x00\x42\x00\xA4\x01\x00\x00\x00\x18'
b'\x42\x00\xA5\x07\x00\x00\x00\x0D\x41\x43\x4D\x45\x20\x5A\x49\x50'
b'\x20\x43\x4F\x44\x45\x00\x00\x00'))
def tearDown(self):
super(TestQueryResponsePayload, self).tearDown()
def test_init_with_none(self):
"""
Test that a QueryResponsePayload object can be constructed with no
specified value.
"""
query.QueryResponsePayload()
def test_init_with_args(self):
"""
Test that a QueryResponsePayload object can be constructed with valid
values.
"""
query.QueryResponsePayload(
operations=self.operations,
object_types=self.object_types,
vendor_identification=self.vendor_identification,
server_information=self.server_information,
application_namespaces=self.application_namespaces,
extension_information=self.extension_information)
def test_validate_with_invalid_operations_list(self):
"""
Test that a TypeError exception is raised when an invalid Operations
list is used to construct a QueryResponsePayload object.
"""
kwargs = {'operations': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid operations list",
query.QueryResponsePayload, **kwargs)
def test_validate_with_invalid_operations_item(self):
"""
Test that a TypeError exception is raised when an invalid Operations
item is used to construct a QueryResponsePayload object.
"""
kwargs = {'operations': ['invalid']}
self.assertRaisesRegexp(
TypeError, "invalid operation",
query.QueryResponsePayload, **kwargs)
def test_validate_with_invalid_object_types_list(self):
"""
Test that a TypeError exception is raised when an invalid ObjectTypes
list is used to construct a QueryResponsePayload object.
"""
kwargs = {'object_types': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid object types list",
query.QueryResponsePayload, **kwargs)
def test_validate_with_invalid_object_types_item(self):
"""
Test that a TypeError exception is raised when an invalid ObjectTypes
item is used to construct a QueryResponsePayload object.
"""
kwargs = {'object_types': ['invalid']}
self.assertRaisesRegexp(
TypeError, "invalid object type",
query.QueryResponsePayload, **kwargs)
def test_validate_with_invalid_vendor_identification(self):
"""
Test that a TypeError exception is raised when an invalid
VendorIdentification item is used to construct a QueryResponsePayload
object.
"""
kwargs = {'vendor_identification': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid vendor identification",
query.QueryResponsePayload, **kwargs)
def test_validate_with_invalid_server_information(self):
"""
Test that a TypeError exception is raised when an invalid
ServerInformation item is used to construct a QueryResponsePayload
object.
"""
kwargs = {'server_information': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid server information",
query.QueryResponsePayload, **kwargs)
def test_validate_with_invalid_application_namespaces_list(self):
"""
Test that a TypeError exception is raised when an invalid
ApplicationNamespaces list is used to construct a QueryResponsePayload
object.
"""
kwargs = {'application_namespaces': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid application namespaces list",
query.QueryResponsePayload, **kwargs)
def test_validate_with_invalid_application_namespaces_item(self):
"""
Test that a TypeError exception is raised when an invalid
ApplicationNamespaces item is used to construct a QueryResponsePayload
object.
"""
kwargs = {'application_namespaces': ['invalid']}
self.assertRaisesRegexp(
TypeError, "invalid application namespace",
query.QueryResponsePayload, **kwargs)
def test_validate_with_invalid_extension_information_list(self):
"""
Test that a TypeError exception is raised when an invalid
ExtensionInformation list is used to construct a QueryResponsePayload
object.
"""
kwargs = {'extension_information': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid extension information list",
query.QueryResponsePayload, **kwargs)
def test_validate_with_invalid_extension_information_item(self):
"""
Test that a TypeError exception is raised when an invalid
ExtensionInformation item is used to construct a QueryResponsePayload
object.
"""
kwargs = {'extension_information': ['invalid']}
self.assertRaisesRegexp(
TypeError, "invalid extension information",
query.QueryResponsePayload, **kwargs)
def _test_read(self, stream, operations, object_types,
vendor_identification, server_information,
application_namespaces, extension_information):
payload = query.QueryResponsePayload()
payload.read(stream)
# Test decoding of all operations.
expected = len(operations)
observed = len(payload.operations)
msg = "operations list decoding mismatch"
msg += "; expected {0} results, received {1}".format(
expected, observed)
self.assertEqual(expected, observed, msg)
for i in xrange(len(operations)):
expected = operations[i]
observed = payload.operations[i]
msg = "operation decoding mismatch"
msg += "; expected {0}, received {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
# Test decoding of all object types.
expected = len(object_types)
observed = len(payload.object_types)
msg = "object types list decoding mismatch"
msg += "; expected {0} results, received {1}".format(
expected, observed)
self.assertEqual(expected, observed, msg)
for i in xrange(len(object_types)):
expected = object_types[i]
observed = payload.object_types[i]
msg = "object type decoding mismatch"
msg += "; expected {0}, received {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
# Test decoding of vendor identification.
expected = vendor_identification
observed = payload.vendor_identification
msg = "vendor identification decoding mismatch"
msg += "; expected {0}, received {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
# Test decoding of server information.
expected = server_information
observed = payload.server_information
msg = "server information decoding mismatch"
msg += "; expected {0}, received {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
# Test decoding of all application namespaces.
expected = len(application_namespaces)
observed = len(payload.application_namespaces)
msg = "application namespaces list decoding mismatch"
msg += "; expected {0} results, received {1}".format(
expected, observed)
self.assertEqual(expected, observed, msg)
# Test decoding of all extension information.
expected = len(extension_information)
observed = len(payload.extension_information)
msg = "extension information list decoding mismatch"
msg += "; expected {0} results, received {1}".format(
expected, observed)
self.assertEqual(expected, observed, msg)
for i in xrange(len(extension_information)):
expected = extension_information[i]
observed = payload.extension_information[i]
msg = "extension information decoding mismatch"
msg += "; expected {0}, received {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
def test_read_with_no_data(self):
"""
Test that a QueryResponsePayload object with no data can be read from
a data stream.
"""
self._test_read(
self.encoding_a, list(), list(), None, None, list(), list())
def test_read_with_operations_object_types_and_server_info(self):
"""
Test that a QueryResponsePayload object with multiple pieces of data
can be read from a data stream.
"""
self._test_read(
self.encoding_b, self.operations, self.object_types,
self.vendor_identification, self.server_information,
self.application_namespaces, list())
def test_read_with_extension_information(self):
"""
Test that a QueryResponsePayload object with one piece of data can be
read from a data stream.
"""
self._test_read(
self.encoding_c, list(), list(), None, None,
self.application_namespaces, self.extension_information)
def _test_write(self, encoding, operations, object_types,
vendor_identification, server_information,
application_namespaces, extension_information):
stream = utils.BytearrayStream()
payload = query.QueryResponsePayload(
operations, object_types, vendor_identification,
server_information, application_namespaces, extension_information)
payload.write(stream)
length_expected = len(encoding)
length_received = len(stream)
msg = "encoding lengths not equal"
msg += "; expected {0}, received {1}".format(
length_expected, length_received)
self.assertEqual(length_expected, length_received, msg)
msg = "encoding mismatch"
msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(encoding, stream)
self.assertEqual(encoding, stream, msg)
def test_write_with_no_data(self):
"""
Test that a QueryResponsePayload object with no data can be written to
a data stream.
"""
self._test_write(
self.encoding_a, list(), list(), None, None, list(), list())
def test_write_with_operations_object_types_and_server_info(self):
"""
Test that a QueryResponsePayload object with multiple pieces of data
can be written to a data stream.
"""
self._test_write(
self.encoding_b, self.operations, self.object_types,
self.vendor_identification, self.server_information,
self.application_namespaces, list())
def test_write_with_extension_information(self):
"""
Test that a QueryResponsePayload object with one piece of data can be
written to a data stream.
"""
self._test_write(
self.encoding_c, list(), list(), None, None,
self.application_namespaces, self.extension_information)

View File

@ -0,0 +1,14 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,117 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six import string_types
from testtools import TestCase
from kmip.core.enums import QueryFunction as QueryFunctionEnum
from kmip.core.misc import QueryFunction
from kmip.core.misc import VendorIdentification
class TestQueryFunction(TestCase):
"""
A test suite for the QueryFunction class.
Since QueryFunction is a simple wrapper for the Enumeration primitive,
only a few tests pertaining to construction are needed.
"""
def setUp(self):
super(TestQueryFunction, self).setUp()
def tearDown(self):
super(TestQueryFunction, self).tearDown()
def _test_init(self, value):
if (isinstance(value, QueryFunctionEnum)) or (value is None):
query_function = QueryFunction(value)
msg = "expected {0}, observed {1}".format(
value, query_function.enum)
self.assertEqual(value, query_function.enum, msg)
else:
self.assertRaises(TypeError, QueryFunction, value)
def test_init_with_none(self):
"""
Test that a QueryFunction object can be constructed with no specified
value.
"""
self._test_init(None)
def test_init_with_valid(self):
"""
Test that a QueryFunction object can be constructed with a valid
QueryFunction enumeration value.
"""
self._test_init(QueryFunctionEnum.QUERY_OBJECTS)
def test_init_with_invalid(self):
"""
Test that a TypeError exception is raised when a non QueryFunction
enumeration value is used to construct a QueryFunction object.
"""
self._test_init("invalid")
class TestVendorIdentification(TestCase):
"""
A test suite for the VendorIdentification class.
Since VendorIdentification is a simple wrapper for the TextString
primitive, only a few tests pertaining to construction are needed.
"""
def setUp(self):
super(TestVendorIdentification, self).setUp()
def tearDown(self):
super(TestVendorIdentification, self).tearDown()
def _test_init(self, value):
if (isinstance(value, string_types)) or (value is None):
vendor_identification = VendorIdentification(value)
if value is None:
value = ''
msg = "expected {0}, observed {1}".format(
value, vendor_identification.value)
self.assertEqual(value, vendor_identification.value, msg)
else:
self.assertRaises(TypeError, VendorIdentification, value)
def test_init_with_none(self):
"""
Test that a VendorIdentification object can be constructed with no
specified value.
"""
self._test_init(None)
def test_init_with_valid(self):
"""
Test that a VendorIdentification object can be constructed with a
valid, string-type value.
"""
self._test_init("valid")
def test_init_with_invalid(self):
"""
Test that a TypeError exception is raised when a non-string value is
used to construct a VendorIdentification object.
"""
self._test_init(0)

View File

@ -0,0 +1,252 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six import string_types
from testtools import TestCase
from kmip.core.misc import ServerInformation
from kmip.core.utils import BytearrayStream
class TestServerInformation(TestCase):
"""
A test suite for the ServerInformation class.
"""
def setUp(self):
super(TestServerInformation, self).setUp()
self.data = BytearrayStream(b'\x00\x01\x02\x03')
self.encoding_a = BytearrayStream(
b'\x42\x00\x88\x01\x00\x00\x00\x00')
self.encoding_b = BytearrayStream(
b'\x42\x00\x88\x01\x00\x00\x00\x04\x00\x01\x02\x03')
def tearDown(self):
super(TestServerInformation, self).tearDown()
def test_init(self):
ServerInformation()
def _test_read(self, stream, data):
server_information = ServerInformation()
server_information.read(stream)
expected = data
observed = server_information.data
msg = "data decoding mismatch"
msg += "; expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
def test_read_with_none(self):
"""
Test that a ServerInformation object with no data can be read from a
data stream.
"""
self._test_read(self.encoding_a, BytearrayStream())
def test_read_with_data(self):
"""
Test that a ServerInformation object with data can be read from a
data stream.
"""
self._test_read(self.encoding_b, self.data)
def _test_write(self, stream_expected, data):
stream_observed = BytearrayStream()
server_information = ServerInformation()
if data is not None:
server_information.data = data
server_information.write(stream_observed)
length_expected = len(stream_expected)
length_observed = len(stream_observed)
msg = "encoding lengths not equal"
msg += "; expected {0}, observed {1}".format(
length_expected, length_observed)
self.assertEqual(length_expected, length_observed, msg)
msg = "encoding mismatch"
msg += ";\nexpected:\n{0}\nobserved:\n{1}".format(
stream_expected, stream_observed)
self.assertEqual(stream_expected, stream_observed, msg)
def test_write_with_none(self):
"""
Test that a ServerInformation object with no data can be written to a
data stream.
"""
self._test_write(self.encoding_a, None)
def test_write_with_data(self):
"""
Test that a ServerInformation object with data can be written to a
data stream.
"""
self._test_write(self.encoding_b, self.data)
def test_equal_on_equal(self):
"""
Test that the equality operator returns True when comparing two
ServerInformation objects with the same internal data.
"""
a = ServerInformation()
b = ServerInformation()
a.data = self.data
b.data = self.data
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_equal_and_empty(self):
"""
Test that the equality operator returns True when comparing two
ServerInformation objects with no internal data.
"""
a = ServerInformation()
b = ServerInformation()
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_not_equal(self):
"""
Test that the equality operator returns False when comparing two
ServerInformation objects with different sets of internal data.
"""
a = ServerInformation()
b = ServerInformation()
a.data = self.data
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_type_mismatch(self):
"""
Test that the equality operator returns False when comparing a
ServerInformation object to a non-ServerInformation object.
"""
a = ServerInformation()
b = "invalid"
self.assertFalse(a == b)
def test_not_equal_on_equal(self):
"""
Test that the inequality operator returns False when comparing
two ServerInformation objects with the same internal data.
"""
a = ServerInformation()
b = ServerInformation()
a.data = self.data
b.data = self.data
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_equal_and_empty(self):
"""
Test that the inequality operator returns False when comparing
two ServerInformation objects with no internal data.
"""
a = ServerInformation()
b = ServerInformation()
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_not_equal(self):
"""
Test that the inequality operator returns True when comparing two
ServerInformation objects with different sets of internal data.
"""
a = ServerInformation()
b = ServerInformation()
a.data = self.data
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_type_mismatch(self):
"""
Test that the inequality operator returns True when comparing a
ServerInformation object to a non-ServerInformation object.
"""
a = ServerInformation()
b = "invalid"
self.assertTrue(a != b)
def test_repr(self):
"""
Test that the representation of a ServerInformation object is
formatted properly and can be used by eval to create a new
ServerInformation object.
"""
server_information = ServerInformation()
expected = "ServerInformation()"
observed = repr(server_information)
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
expected = server_information
observed = eval(observed)
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
def _test_str(self, data):
server_information = ServerInformation()
server_information.data = data
str_repr = str(server_information)
expected = len(str(data))
observed = len(str_repr)
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
# TODO (peter-hamilton) This should be binary_type. Fix involves
# TODO (peter-hamilton) refining BytearrayStream implementation.
expected = string_types
observed = str_repr
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertIsInstance(observed, expected, msg)
def test_str_with_no_data(self):
"""
Test that the string representation of a ServerInformation object
is formatted properly when there is no internal data.
"""
self._test_str(BytearrayStream())
def test_str_with_data(self):
"""
Test that the string representation of a ServerInformation object
is formatted properly when there is internal data.
"""
self._test_str(self.data)

View File

@ -0,0 +1,14 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,444 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testtools import TestCase
from kmip.core.objects import ExtensionInformation
from kmip.core.objects import ExtensionName
from kmip.core.objects import ExtensionTag
from kmip.core.objects import ExtensionType
from kmip.core.utils import BytearrayStream
class TestExtensionInformation(TestCase):
"""
A test suite for the ExtensionInformation class.
Test encodings obtained from Section 12.2 of the KMIP 1.1 Test Cases
documentation.
"""
def setUp(self):
super(TestExtensionInformation, self).setUp()
self.extension_name_b = ExtensionName('ACME LOCATION')
self.extension_name_c = ExtensionName('ACME LOCATION')
self.extension_name_d = ExtensionName('ACME ZIP CODE')
self.extension_tag_c = ExtensionTag(5548545)
self.extension_tag_d = ExtensionTag(5548546)
self.extension_type_c = ExtensionType(7)
self.extension_type_d = ExtensionType(2)
self.encoding_a = BytearrayStream(
b'\x42\x00\xA4\x01\x00\x00\x00\x08\x42\x00\xA5\x07\x00\x00\x00'
b'\x00')
self.encoding_b = BytearrayStream(
b'\x42\x00\xA4\x01\x00\x00\x00\x18\x42\x00\xA5\x07\x00\x00\x00\x0D'
b'\x41\x43\x4D\x45\x20\x4C\x4F\x43\x41\x54\x49\x4F\x4E\x00\x00'
b'\x00')
self.encoding_c = BytearrayStream(
b'\x42\x00\xA4\x01\x00\x00\x00\x38\x42\x00\xA5\x07\x00\x00\x00\x0D'
b'\x41\x43\x4D\x45\x20\x4C\x4F\x43\x41\x54\x49\x4F\x4E\x00\x00\x00'
b'\x42\x00\xA6\x02\x00\x00\x00\x04\x00\x54\xAA\x01\x00\x00\x00\x00'
b'\x42\x00\xA7\x02\x00\x00\x00\x04\x00\x00\x00\x07\x00\x00\x00'
b'\x00')
self.encoding_d = BytearrayStream(
b'\x42\x00\xA4\x01\x00\x00\x00\x38\x42\x00\xA5\x07\x00\x00\x00\x0D'
b'\x41\x43\x4D\x45\x20\x5A\x49\x50\x20\x43\x4F\x44\x45\x00\x00\x00'
b'\x42\x00\xA6\x02\x00\x00\x00\x04\x00\x54\xAA\x02\x00\x00\x00\x00'
b'\x42\x00\xA7\x02\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00'
b'\x00')
def tearDown(self):
super(TestExtensionInformation, self).tearDown()
def _test_init(self):
pass
def test_init_with_none(self):
ExtensionInformation()
def test_init_with_args(self):
ExtensionInformation(
extension_name=ExtensionName(),
extension_tag=ExtensionTag(),
extension_type=ExtensionType())
def test_validate_with_invalid_extension_name(self):
"""
Test that a TypeError exception is raised when an invalid
ExtensionName is used to construct an ExtensionInformation object.
"""
kwargs = {'extension_name': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid extension name",
ExtensionInformation, **kwargs)
def test_validate_with_invalid_extension_tag(self):
"""
Test that a TypeError exception is raised when an invalid
ExtensionTag is used to construct an ExtensionInformation object.
"""
kwargs = {'extension_tag': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid extension tag",
ExtensionInformation, **kwargs)
def test_validate_with_invalid_extension_type(self):
"""
Test that a TypeError exception is raised when an invalid
ExtensionType is used to construct an ExtensionInformation object.
"""
kwargs = {'extension_type': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid extension type",
ExtensionInformation, **kwargs)
def _test_read(self, stream, extension_name, extension_tag,
extension_type):
extension_information = ExtensionInformation()
extension_information.read(stream)
if extension_name is None:
extension_name = ExtensionName()
msg = "extension name encoding mismatch"
msg += "; expected {0}, observed {1}".format(
extension_name,
extension_information.extension_name)
self.assertEqual(
extension_name,
extension_information.extension_name, msg)
msg = "extension tag encoding mismatch"
msg += "; expected {0}, observed {1}".format(
extension_tag,
extension_information.extension_tag)
self.assertEqual(
extension_tag,
extension_information.extension_tag, msg)
msg = "extension type encoding mismatch"
msg += "; expected {0}, observed {1}".format(
extension_type,
extension_information.extension_type)
self.assertEqual(
extension_type,
extension_information.extension_type, msg)
def test_read_with_none(self):
"""
Test that an ExtensionInformation object with no data can be read from
a data stream.
"""
self._test_read(self.encoding_a, None, None, None)
def test_read_with_partial_args(self):
"""
Test that an ExtensionInformation object with some data can be read
from a data stream.
"""
self._test_read(self.encoding_b, self.extension_name_b, None, None)
def test_read_with_multiple_args(self):
"""
Test that an ExtensionInformation object with data can be read from a
data stream.
"""
self._test_read(self.encoding_c, self.extension_name_c,
self.extension_tag_c, self.extension_type_c)
def _test_write(self, stream_expected, extension_name, extension_tag,
extension_type):
stream_observed = BytearrayStream()
extension_information = ExtensionInformation(
extension_name=extension_name,
extension_tag=extension_tag,
extension_type=extension_type)
extension_information.write(stream_observed)
length_expected = len(stream_expected)
length_observed = len(stream_observed)
msg = "encoding lengths not equal"
msg += "; expected {0}, observed {1}".format(
length_expected, length_observed)
self.assertEqual(length_expected, length_observed, msg)
msg = "encoding mismatch"
msg += ";\nexpected:\n{0}\nobserved:\n{1}".format(
stream_expected, stream_observed)
self.assertEqual(stream_expected, stream_observed, msg)
def test_write_with_none(self):
"""
Test that an ExtensionInformation object with no data can be written
to a data stream.
"""
self._test_write(self.encoding_a, None, None, None)
def test_write_with_partial_args(self):
"""
Test that an ExtensionInformation object with some data can be written
to a data stream.
"""
self._test_write(self.encoding_b, self.extension_name_b, None, None)
def test_write_with_multiple_args(self):
"""
Test that an ExtensionInformation object with data can be written to
a data stream.
"""
self._test_write(self.encoding_c, self.extension_name_c,
self.extension_tag_c, self.extension_type_c)
def _test_create(self, extension_name, extension_tag, extension_type):
extension_information = ExtensionInformation.create(
extension_name=extension_name,
extension_tag=extension_tag,
extension_type=extension_type)
self.assertIsInstance(extension_information, ExtensionInformation)
expected = ExtensionName(extension_name)
observed = extension_information.extension_name
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
expected = ExtensionTag(extension_tag)
observed = extension_information.extension_tag
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
expected = ExtensionType(extension_type)
observed = extension_information.extension_type
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
def test_create_with_none(self):
"""
Test that an ExtensionInformation object with no data can be created
using the create class method.
"""
self._test_create(None, None, None)
def test_create_with_args(self):
"""
Test that an ExtensionInformation object with data can be created
using the create class method.
"""
self._test_create('ACME LOCATION', 5548545, 7)
def test_equal_on_equal(self):
"""
Test that the equality operator returns True when comparing two
ExtensionInformation objects with the same internal data.
"""
a = ExtensionInformation(
extension_name=self.extension_name_c,
extension_tag=self.extension_tag_c,
extension_type=self.extension_type_c)
b = ExtensionInformation(
extension_name=self.extension_name_c,
extension_tag=self.extension_tag_c,
extension_type=self.extension_type_c)
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_equal_and_empty(self):
"""
Test that the equality operator returns True when comparing two
ExtensionInformation objects with no internal data.
"""
a = ExtensionInformation()
b = ExtensionInformation()
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_not_equal(self):
"""
Test that the equality operator returns False when comparing two
ExtensionInformation objects with different sets of internal data.
"""
a = ExtensionInformation(
extension_name=self.extension_name_c,
extension_tag=self.extension_tag_c,
extension_type=self.extension_type_c)
b = ExtensionInformation()
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_type_mismatch(self):
"""
Test that the equality operator returns False when comparing an
ExtensionInformation object with a non-ExtensionInformation object.
"""
a = ExtensionInformation(
extension_name=self.extension_name_c,
extension_tag=self.extension_tag_c,
extension_type=self.extension_type_c)
b = "invalid"
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_not_equal_on_equal(self):
"""
Test that the inequality operator returns False when comparing two
ExtensionInformation objects with the same internal data.
"""
a = ExtensionInformation(
extension_name=self.extension_name_c,
extension_tag=self.extension_tag_c,
extension_type=self.extension_type_c)
b = ExtensionInformation(
extension_name=self.extension_name_c,
extension_tag=self.extension_tag_c,
extension_type=self.extension_type_c)
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_equal_and_empty(self):
"""
Test that the inequality operator returns False when comparing two
ExtensionInformation objects with no internal data.
"""
a = ExtensionInformation()
b = ExtensionInformation()
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_not_equal(self):
"""
Test that the inequality operator returns True when comparing two
ExtensionInformation objects with the different sets of internal data.
"""
a = ExtensionInformation(
extension_name=self.extension_name_c,
extension_tag=self.extension_tag_c,
extension_type=self.extension_type_c)
b = ExtensionInformation()
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_type_mismatch(self):
"""
Test that the inequality operator returns True when comparing an
ExtensionInformation object with a non-ExtensionInformation object.
"""
a = ExtensionInformation(
extension_name=self.extension_name_c,
extension_tag=self.extension_tag_c,
extension_type=self.extension_type_c)
b = "invalid"
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_repr_with_no_data(self):
"""
Test that the representation of an ExtensionInformation object with no
data is formatted properly and can be used by eval to create a new
ExtensionInformation object identical to the original.
"""
extension_information = ExtensionInformation()
expected = "ExtensionInformation("
expected += "extension_name=ExtensionName(value=''), "
expected += "extension_tag=None, "
expected += "extension_type=None)"
observed = repr(extension_information)
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
expected = extension_information
observed = eval(repr(extension_information))
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
def test_repr_with_data(self):
"""
Test that the representation of an ExtensionInformation object with
data is formatted properly and can be used by eval to create a new
ExtensionInformation object identical to the original.
"""
extension_information = ExtensionInformation(
extension_name=ExtensionName('ACME LOCATION'),
extension_tag=ExtensionTag(5548545),
extension_type=ExtensionType(7))
expected = "ExtensionInformation("
expected += "extension_name=ExtensionName(value='ACME LOCATION'), "
expected += "extension_tag=ExtensionTag(value=5548545), "
expected += "extension_type=ExtensionType(value=7))"
observed = repr(extension_information)
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
expected = extension_information
observed = eval(repr(extension_information))
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
def test_str_with_no_data(self):
"""
Test that the string representation of an ExtensionInformation object
is formatted properly when there is no internal data.
"""
extension_information = ExtensionInformation()
expected = "ExtensionInformation("
expected += "extension_name=ExtensionName(value=''), "
expected += "extension_tag=None, "
expected += "extension_type=None)"
observed = str(extension_information)
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
def test_str_with_data(self):
"""
Test that the string representation of an ExtensionInformation object
is formatted properly when there is internal data.
"""
extension_information = ExtensionInformation(
extension_name=ExtensionName('ACME LOCATION'),
extension_tag=ExtensionTag(5548545),
extension_type=ExtensionType(7))
expected = "ExtensionInformation("
expected += "extension_name=ExtensionName(value='ACME LOCATION'), "
expected += "extension_tag=ExtensionTag(value=5548545), "
expected += "extension_type=ExtensionType(value=7))"
observed = str(extension_information)
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)

View File

@ -0,0 +1,168 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six import string_types
from testtools import TestCase
from kmip.core.objects import ExtensionName
from kmip.core.objects import ExtensionTag
from kmip.core.objects import ExtensionType
class TestExtensionName(TestCase):
"""
A test suite for the ExtensionName class.
Since ExtensionName is a simple wrapper for the TextString primitive, only
a few tests pertaining to construction are needed.
"""
def setUp(self):
super(TestExtensionName, self).setUp()
def tearDown(self):
super(TestExtensionName, self).tearDown()
def _test_init(self, value):
if (isinstance(value, string_types)) or (value is None):
extension_name = ExtensionName(value)
if value is None:
value = ''
msg = "expected {0}, observed {1}".format(
value, extension_name.value)
self.assertEqual(value, extension_name.value, msg)
else:
self.assertRaises(TypeError, ExtensionName, value)
def test_init_with_none(self):
"""
Test that an ExtensionName object can be constructed with no specified
value.
"""
self._test_init(None)
def test_init_with_valid(self):
"""
Test that an ExtensionName object can be constructed with a valid
string value.
"""
self._test_init("valid")
def test_init_with_invalid(self):
"""
Test that a TypeError exception is raised when a non-string value is
used to construct an ExtensionName object.
"""
self._test_init(0)
class TestExtensionTag(TestCase):
"""
A test suite for the ExtensionTag class.
Since ExtensionTag is a simple wrapper for the Integer primitive, only a
few tests pertaining to construction are needed.
"""
def setUp(self):
super(TestExtensionTag, self).setUp()
def tearDown(self):
super(TestExtensionTag, self).tearDown()
def _test_init(self, value):
if (isinstance(value, int)) or (value is None):
extension_tag = ExtensionTag(value)
if value is None:
value = 0
msg = "expected {0}, observed {1}".format(
value, extension_tag.value)
self.assertEqual(value, extension_tag.value, msg)
else:
self.assertRaises(TypeError, ExtensionTag, value)
def test_init_with_none(self):
"""
Test that an ExtensionTag object can be constructed with no specified
value.
"""
self._test_init(None)
def test_init_with_valid(self):
"""
Test that an ExtensionTag object can be constructed with a valid
integer value.
"""
self._test_init(0)
def test_init_with_invalid(self):
"""
Test that a TypeError exception is raised when a non-integer value is
used to construct an ExtensionName object.
"""
self._test_init("invalid")
class TestExtensionType(TestCase):
"""
A test suite for the ExtensionType class.
Since ExtensionType is a simple wrapper for the Integer primitive, only a
few tests pertaining to construction are needed.
"""
def setUp(self):
super(TestExtensionType, self).setUp()
def tearDown(self):
super(TestExtensionType, self).tearDown()
def _test_init(self, value):
if (isinstance(value, int)) or (value is None):
extension_type = ExtensionType(value)
if value is None:
value = 0
msg = "expected {0}, observed {1}".format(
value, extension_type.value)
self.assertEqual(value, extension_type.value, msg)
else:
self.assertRaises(TypeError, ExtensionType, value)
def test_init_with_none(self):
"""
Test that an ExtensionType object can be constructed with no specified
value.
"""
self._test_init(None)
def test_init_with_valid(self):
"""
Test that an ExtensionType object can be constructed with a valid
integer value.
"""
self._test_init(0)
def test_init_with_invalid(self):
"""
Test that a TypeError exception is raised when a non-string value is
used to construct an ExtensionType object.
"""
self._test_init("invalid")

View File

@ -30,6 +30,7 @@ from kmip.core.enums import CryptographicUsageMask
from kmip.core.enums import ObjectType
from kmip.core.enums import Operation as OperationEnum
from kmip.core.enums import KeyFormatType
from kmip.core.enums import QueryFunction as QueryFunctionEnum
from kmip.core.enums import ResultStatus
from kmip.core.enums import ResultReason
@ -51,10 +52,15 @@ from kmip.core.messages.payloads.create_key_pair import \
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
from kmip.core.messages.payloads.discover_versions import \
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
from kmip.core.messages.payloads.query import \
QueryRequestPayload, QueryResponsePayload
from kmip.core.messages.payloads.rekey_key_pair import \
RekeyKeyPairRequestPayload, RekeyKeyPairResponsePayload
from kmip.core.misc import Offset
from kmip.core.misc import QueryFunction
from kmip.core.misc import ServerInformation
from kmip.core.misc import VendorIdentification
from kmip.core.objects import Attribute
from kmip.core.objects import CommonTemplateAttribute
@ -68,6 +74,7 @@ from kmip.services.kmip_client import KMIPProxy
from kmip.services.results import CreateKeyPairResult
from kmip.services.results import DiscoverVersionsResult
from kmip.services.results import QueryResult
from kmip.services.results import RekeyKeyPairResult
import kmip.core.utils as utils
@ -594,6 +601,41 @@ class TestKMIPClient(TestCase):
self._test_build_rekey_key_pair_batch_item(
None, None, None, None, None)
def _test_build_query_batch_item(self, query_functions):
batch_item = self.client._build_query_batch_item(query_functions)
base = "expected {0}, received {1}"
msg = base.format(RequestBatchItem, batch_item)
self.assertIsInstance(batch_item, RequestBatchItem, msg)
operation = batch_item.operation
msg = base.format(Operation, operation)
self.assertIsInstance(operation, Operation, msg)
operation_enum = operation.enum
msg = base.format(OperationEnum.QUERY, operation_enum)
self.assertEqual(OperationEnum.QUERY, operation_enum, msg)
payload = batch_item.request_payload
if query_functions is None:
query_functions = list()
msg = base.format(QueryRequestPayload, payload)
self.assertIsInstance(payload, QueryRequestPayload, msg)
query_functions_observed = payload.query_functions
self.assertEqual(query_functions, query_functions_observed)
def test_build_query_batch_item_with_input(self):
self._test_build_query_batch_item(
[QueryFunction(QueryFunctionEnum.QUERY_OBJECTS)])
def test_build_query_batch_item_without_input(self):
self._test_build_query_batch_item(None)
def _test_build_discover_versions_batch_item(self, protocol_versions):
batch_item = self.client._build_discover_versions_batch_item(
protocol_versions)
@ -680,6 +722,10 @@ class TestKMIPClient(TestCase):
self.assertRaisesRegexp(ValueError, "no processor for operation",
self.client._get_batch_item_processor, None)
def _test_equality(self, expected, observed):
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
def test_process_create_key_pair_batch_item(self):
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.CREATE_KEY_PAIR),
@ -698,6 +744,64 @@ class TestKMIPClient(TestCase):
msg = "expected {0}, received {1}".format(RekeyKeyPairResult, result)
self.assertIsInstance(result, RekeyKeyPairResult, msg)
def _test_process_query_batch_item(
self,
operations,
object_types,
vendor_identification,
server_information,
application_namespaces,
extension_information):
payload = QueryResponsePayload(
operations,
object_types,
vendor_identification,
server_information,
application_namespaces,
extension_information)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.QUERY),
response_payload=payload)
result = self.client._process_query_batch_item(batch_item)
base = "expected {0}, observed {1}"
msg = base.format(QueryResult, result)
self.assertIsInstance(result, QueryResult, msg)
# The payload maps the following inputs to empty lists on None.
if operations is None:
operations = list()
if object_types is None:
object_types = list()
if application_namespaces is None:
application_namespaces = list()
if extension_information is None:
extension_information = list()
self._test_equality(operations, result.operations)
self._test_equality(object_types, result.object_types)
self._test_equality(
vendor_identification, result.vendor_identification)
self._test_equality(server_information, result.server_information)
self._test_equality(
application_namespaces, result.application_namespaces)
self._test_equality(
extension_information, result.extension_information)
def test_process_query_batch_item_with_results(self):
self._test_process_query_batch_item(
list(),
list(),
VendorIdentification(),
ServerInformation(),
list(),
list())
def test_process_query_batch_item_without_results(self):
self._test_process_query_batch_item(None, None, None, None, None, None)
def _test_process_discover_versions_batch_item(self, protocol_versions):
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.DISCOVER_VERSIONS),