Removing deprecated server code and supporting infrastructure

This change removes the original KMIPServer implementation, along
with all supporting classes. The KmipServer implementation is the
only supported server implementation going forward.
This commit is contained in:
Peter Hamilton 2017-02-08 15:29:10 -05:00
parent d9cf4c148a
commit c584ac0cb5
10 changed files with 0 additions and 1739 deletions

View File

@ -13,44 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
from kmip.core.attributes import CryptographicLength
from kmip.core.attributes import CryptographicAlgorithm
from kmip.core.attributes import ObjectType
from kmip.core.attributes import UniqueIdentifier
from kmip.core.enums import AttributeType as AT
from kmip.core.enums import CryptographicAlgorithm as CA
from kmip.core.enums import KeyFormatType as KeyFormatTypeEnum
from kmip.core.enums import ObjectType as OT
from kmip.core.enums import ResultReason as ResultReasonEnum
from kmip.core.enums import ResultStatus as RS
from kmip.core.factories.attributes import AttributeFactory
from kmip.core.factories.keys import KeyFactory
from kmip.core.factories.secrets import SecretFactory
from kmip.core.messages.contents import ResultStatus
from kmip.core.messages.contents import ResultReason
from kmip.core.messages.contents import ResultMessage
from kmip.core.messages.contents import ProtocolVersion
from kmip.core.misc import KeyFormatType
from kmip.core.objects import KeyBlock
from kmip.core.objects import KeyMaterial
from kmip.core.objects import KeyValue
from kmip.core.objects import TemplateAttribute
from kmip.core.secrets import SymmetricKey
from kmip.services.server.repo.mem_repo import MemRepo
from kmip.services.results import CreateResult
from kmip.services.results import DestroyResult
from kmip.services.results import GetResult
from kmip.services.results import OperationResult
from kmip.services.results import RegisterResult
from kmip.services.results import LocateResult
from kmip.services.results import DiscoverVersionsResult
class KMIP(object):
@ -89,334 +51,3 @@ class KMIP(object):
def discover_versions(self, protocol_versions=None):
raise NotImplementedError()
class KMIPImpl(KMIP):
def __init__(self):
super(KMIPImpl, self).__init__()
self.logger = logging.getLogger(__name__)
self.key_factory = KeyFactory()
self.secret_factory = SecretFactory()
self.attribute_factory = AttributeFactory()
self.repo = MemRepo()
self.protocol_versions = [
ProtocolVersion.create(1, 1),
ProtocolVersion.create(1, 0)
]
def create(self, object_type, template_attribute, credential=None):
self.logger.debug('create() called')
self.logger.debug('object type = %s' % object_type)
bit_length = 256
attributes = template_attribute.attributes
ret_attributes = []
if object_type.value != OT.SYMMETRIC_KEY:
self.logger.debug('invalid object type')
return self._get_invalid_field_result('invalid object type')
try:
alg_attr = self._validate_req_field(
attributes, AT.CRYPTOGRAPHIC_ALGORITHM.value,
(CA.AES,), 'unsupported algorithm')
len_attr = self._validate_req_field(
attributes, AT.CRYPTOGRAPHIC_LENGTH.value,
(128, 256, 512), 'unsupported key length', False)
self._validate_req_field(
attributes, AT.CRYPTOGRAPHIC_USAGE_MASK.value, (), '')
except InvalidFieldException as e:
self.logger.debug('InvalidFieldException raised')
return e.result
crypto_alg = CryptographicAlgorithm(CA(alg_attr.attribute_value.value))
if len_attr is None:
self.logger.debug('cryptographic length not supplied')
attribute_type = AT.CRYPTOGRAPHIC_LENGTH
length_attribute = self.attribute_factory.\
create_attribute(attribute_type, bit_length)
attributes.append(length_attribute)
ret_attributes.append(length_attribute)
else:
bit_length = len_attr.attribute_value.value
key = self._gen_symmetric_key(bit_length, crypto_alg)
s_uuid, uuid_attribute = self._save(key, attributes)
ret_attributes.append(uuid_attribute)
template_attribute = TemplateAttribute(attributes=ret_attributes)
return CreateResult(ResultStatus(RS.SUCCESS), object_type=object_type,
uuid=UniqueIdentifier(s_uuid),
template_attribute=template_attribute)
def create_key_pair(self, common_template_attribute,
private_key_template_attribute,
public_key_template_attribute):
raise NotImplementedError()
def register(self, object_type, template_attribute, secret,
credential=None):
self.logger.debug('register() called')
self.logger.debug('object type = %s' % object_type)
attributes = template_attribute.attributes
ret_attributes = []
if object_type is None:
self.logger.debug('invalid object type')
return self._get_missing_field_result('object type')
if object_type.value != OT.SYMMETRIC_KEY:
self.logger.debug('invalid object type')
return self._get_invalid_field_result('invalid object type')
if secret is None or not isinstance(secret, SymmetricKey):
msg = 'object type does not match that of secret'
self.logger.debug(msg)
return self._get_invalid_field_result(msg)
self.logger.debug('Collecting all attributes')
if attributes is None:
attributes = []
attributes.extend(self._get_key_block_attributes(secret.key_block))
self.logger.debug('Verifying all attributes are valid and set')
try:
self._validate_req_field(
attributes, AT.CRYPTOGRAPHIC_ALGORITHM.value, (CA.AES,),
'unsupported algorithm')
self._validate_req_field(
attributes, AT.CRYPTOGRAPHIC_LENGTH.value, (128, 256, 512),
'unsupported key length')
self._validate_req_field(
attributes, AT.CRYPTOGRAPHIC_USAGE_MASK.value, (), '')
except InvalidFieldException as e:
self.logger.debug('InvalidFieldException raised')
return RegisterResult(e.result.result_status,
e.result.result_reason,
e.result.result_message)
s_uuid, uuid_attribute = self._save(secret, attributes)
ret_attributes.append(uuid_attribute)
template_attribute = TemplateAttribute(attributes=ret_attributes)
return RegisterResult(ResultStatus(RS.SUCCESS),
uuid=UniqueIdentifier(s_uuid),
template_attribute=template_attribute)
def rekey_key_pair(self, private_key_unique_identifier,
offset, common_template_attribute,
private_key_template_attribute,
public_key_template_attribute):
raise NotImplementedError()
def get(self,
uuid=None,
key_format_type=None,
key_compression_type=None,
key_wrapping_specification=None,
credential=None):
self.logger.debug('get() called')
ret_value = RS.OPERATION_FAILED
if uuid is None or not hasattr(uuid, 'value'):
self.logger.debug('no uuid provided')
reason = ResultReason(ResultReasonEnum.ITEM_NOT_FOUND)
message = ResultMessage('')
return GetResult(ResultStatus(ret_value), reason, message)
if key_format_type is None:
self.logger.debug('key format type is None, setting to raw')
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
if key_format_type.value != KeyFormatTypeEnum.RAW:
self.logger.debug('key format type is not raw')
reason = ResultReason(ResultReasonEnum.
KEY_FORMAT_TYPE_NOT_SUPPORTED)
message = ResultMessage('')
return GetResult(ResultStatus(ret_value), reason, message)
if key_compression_type is not None:
self.logger.debug('key compression type is not None')
reason = ResultReason(ResultReasonEnum.
KEY_COMPRESSION_TYPE_NOT_SUPPORTED)
message = ResultMessage('')
return GetResult(ResultStatus(ret_value), reason, message)
if key_wrapping_specification is not None:
self.logger.debug('key wrapping specification is not None')
reason = ResultReason(ResultReasonEnum.FEATURE_NOT_SUPPORTED)
message = ResultMessage('key wrapping is not currently supported')
return GetResult(ResultStatus(ret_value), reason, message)
self.logger.debug('retrieving object from repo')
managed_object, _ = self.repo.get(uuid.value)
if managed_object is None:
self.logger.debug('object not found in repo')
reason = ResultReason(ResultReasonEnum.ITEM_NOT_FOUND)
message = ResultMessage('')
return GetResult(ResultStatus(ret_value), reason, message)
# currently only symmetric keys are supported, fix this in future
object_type = ObjectType(OT.SYMMETRIC_KEY)
ret_value = RS.SUCCESS
return GetResult(ResultStatus(ret_value), object_type=object_type,
uuid=uuid, secret=managed_object)
def destroy(self, uuid):
self.logger.debug('destroy() called')
ret_value = RS.OPERATION_FAILED
if uuid is None or not hasattr(uuid, 'value'):
self.logger.debug('no uuid provided')
reason = ResultReason(ResultReasonEnum.ITEM_NOT_FOUND)
message = ResultMessage('')
return DestroyResult(ResultStatus(ret_value), reason, message)
msg = 'deleting object from repo: {0}'.format(uuid)
self.logger.debug(msg)
if not self.repo.delete(uuid.value):
self.logger.debug('repo did not find and delete managed object')
reason = ResultReason(ResultReasonEnum.ITEM_NOT_FOUND)
message = ResultMessage('')
return DestroyResult(ResultStatus(ret_value), reason, message)
ret_value = RS.SUCCESS
return DestroyResult(ResultStatus(ret_value), uuid=uuid)
def locate(self, maximum_items=None, storage_status_mask=None,
object_group_member=None, attributes=None,
credential=None):
self.logger.debug('locate() called')
msg = 'locating object(s) from repo'
self.logger.debug(msg)
try:
uuids = self.repo.locate(maximum_items, storage_status_mask,
object_group_member, attributes)
return LocateResult(ResultStatus(RS.SUCCESS), uuids=uuids)
except NotImplementedError:
msg = ResultMessage('Locate Operation Not Supported')
reason = ResultReason(ResultReasonEnum.OPERATION_NOT_SUPPORTED)
return LocateResult(ResultStatus(RS.OPERATION_FAILED),
result_reason=reason, result_message=msg)
def discover_versions(self, protocol_versions=None):
self.logger.debug(
"discover_versions(protocol_versions={0}) called".format(
protocol_versions))
msg = 'get protocol versions supported by server'
result_versions = list()
if protocol_versions:
msg += " and client; client versions {0}".format(protocol_versions)
for version in protocol_versions:
if version in self.protocol_versions:
result_versions.append(version)
else:
result_versions = self.protocol_versions
self.logger.debug(msg)
try:
return DiscoverVersionsResult(ResultStatus(RS.SUCCESS),
protocol_versions=result_versions)
except Exception:
msg = ResultMessage('DiscoverVersions Operation Failed')
reason = ResultReason(ResultReasonEnum.GENERAL_FAILURE)
return DiscoverVersionsResult(ResultStatus(RS.OPERATION_FAILED),
result_reason=reason,
result_message=msg)
def _validate_req_field(self, attrs, name, expected, msg, required=True):
self.logger.debug('Validating attribute %s' % name)
seen = False
found_attr = None
for attr in attrs:
if self._validate_field(attr, name, expected, msg):
if seen:
# TODO check what spec says to do on this
msg = 'duplicate attribute: %s' % name
self.logger.debug(msg)
result = self._get_duplicate_attribute_result(name)
raise InvalidFieldException(result)
seen = True
found_attr = attr
if required and not seen:
result = self._get_missing_field_result(name)
raise InvalidFieldException(result)
return found_attr
def _validate_field(self, attr, name, expected, msg):
if attr.attribute_name.value == name:
self.logger.debug('validating attribute %s' % name)
if not expected or attr.attribute_value.value in expected:
self.logger.debug('attribute validated')
return True
else:
self.logger.debug('attribute not validated')
result = self._get_invalid_field_result(msg)
raise InvalidFieldException(result)
else:
return False
def _get_invalid_field_result(self, msg):
status = ResultStatus(RS.OPERATION_FAILED)
reason = ResultReason(ResultReasonEnum.INVALID_FIELD)
message = ResultMessage(msg)
return OperationResult(status, reason, message)
def _get_missing_field_result(self, name):
msg = '%s not supplied' % name
self.logger.debug(msg)
status = ResultStatus(RS.OPERATION_FAILED)
reason = ResultReason(ResultReasonEnum.ITEM_NOT_FOUND)
message = ResultMessage(msg)
return OperationResult(status, reason, message)
def _get_duplicate_attribute_result(self, name):
msg = '%s supplied multiple times' % name
self.logger.debug(msg)
status = ResultStatus(RS.OPERATION_FAILED)
reason = ResultReason(ResultReasonEnum.INDEX_OUT_OF_BOUNDS)
message = ResultMessage(msg)
return OperationResult(status, reason, message)
def _gen_symmetric_key(self, bit_length, crypto_alg):
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
key_material = KeyMaterial(os.urandom(int(bit_length/8)))
key_value = KeyValue(key_material)
crypto_length = CryptographicLength(bit_length)
key_block = KeyBlock(key_format_type, None, key_value, crypto_alg,
crypto_length, None)
return SymmetricKey(key_block)
def _save(self, key, attributes):
s_uuid = self.repo.save(key, attributes)
self.logger.debug('creating object with uuid = %s' % s_uuid)
attribute_type = AT.UNIQUE_IDENTIFIER
attribute = self.attribute_factory.create_attribute(attribute_type,
s_uuid)
attributes.append(attribute)
# Calling update to also store the UUID
self.repo.update(s_uuid, key, attributes)
return s_uuid, attribute
def _get_key_block_attributes(self, key_block):
self.logger.debug('getting all key attributes from key block')
attributes = []
if key_block.cryptographic_algorithm is not None:
self.logger.debug('crypto_alg set on key block')
self.logger.debug('adding crypto algorithm attribute')
at = AT.CRYPTOGRAPHIC_ALGORITHM
alg = key_block.cryptographic_algorithm.value
attributes.append(self.attribute_factory.create_attribute(at, alg))
if key_block.cryptographic_length is not None:
self.logger.debug('crypto_length set on key block')
self.logger.debug('adding crypto length attribute')
at = AT.CRYPTOGRAPHIC_LENGTH
len = key_block.cryptographic_length.value
attributes.append(self.attribute_factory.create_attribute(at, len))
self.logger.debug('getting key value attributes')
if key_block.key_wrapping_data is not None:
self.logger.debug('no wrapping data so key value is struct')
kv = key_block.key_value
if isinstance(kv, KeyValue):
kv = key_block.key_value
if kv.attributes is not None:
self.logger.debug('adding the key value struct attributes')
attributes.extend(kv.attributes)
return attributes
class InvalidFieldException(Exception):
def __init__(self, result):
super(InvalidFieldException, self).__init__()
self.result = result

View File

@ -1,126 +0,0 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import socket
import ssl
import warnings
from kmip.core.config_helper import ConfigHelper
from kmip.core.server import KMIPImpl
from kmip.services.server.kmip_protocol import KMIPProtocolFactory
from kmip.services.server.processor import Processor
FILE_PATH = os.path.dirname(os.path.abspath(__file__))
class KMIPServer(object):
def __init__(self, host=None, port=None, keyfile=None, certfile=None,
cert_reqs=None, ssl_version=None, ca_certs=None,
do_handshake_on_connect=None, suppress_ragged_eofs=None):
warnings.simplefilter("always")
warnings.warn((
"Please use the newer KmipServer located in kmip.services.server. "
"This version of the server will be deprecated in the future."),
PendingDeprecationWarning
)
warnings.simplefilter("default")
self.logger = logging.getLogger(__name__)
self._set_variables(host, port, keyfile, certfile, cert_reqs,
ssl_version, ca_certs, do_handshake_on_connect,
suppress_ragged_eofs)
handler = KMIPImpl()
self._processor = Processor(handler)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
def close(self):
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
def serve(self):
self.socket.listen(0)
while True:
connection, address = self.socket.accept()
self.logger.info("Connected by {0}".format(address))
connection = ssl.wrap_socket(
connection,
keyfile=self.keyfile,
certfile=self.certfile,
server_side=True,
cert_reqs=self.cert_reqs,
ssl_version=self.ssl_version,
ca_certs=self.ca_certs,
do_handshake_on_connect=self.do_handshake_on_connect,
suppress_ragged_eofs=self.suppress_ragged_eofs)
factory = KMIPProtocolFactory()
protocol = factory.getProtocol(connection)
try:
while True:
self._processor.process(protocol, protocol)
except EOFError as e:
self.logger.warning("KMIPServer {0} {1}".format(type(e), e))
except Exception as e:
self.logger.error('KMIPServer {0} {1}'.format(type(e), e))
finally:
connection.close()
self.logger.info('Connection closed')
def _set_variables(self, host, port, keyfile, certfile, cert_reqs,
ssl_version, ca_certs, do_handshake_on_connect,
suppress_ragged_eofs):
conf = ConfigHelper()
self.host = conf.get_valid_value(host, 'server',
'host', conf.DEFAULT_HOST)
self.port = int(conf.get_valid_value(port, 'server',
'port', conf.DEFAULT_PORT))
self.keyfile = conf.get_valid_value(
keyfile, 'server', 'keyfile', conf.DEFAULT_KEYFILE)
self.certfile = conf.get_valid_value(
certfile, 'server', 'certfile', conf.DEFAULT_CERTFILE)
self.cert_reqs = getattr(ssl, conf.get_valid_value(
cert_reqs, 'server', 'cert_reqs', 'CERT_NONE'))
self.ssl_version = getattr(ssl, conf.get_valid_value(
ssl_version, 'server', 'ssl_version', conf.DEFAULT_SSL_VERSION))
self.ca_certs = conf.get_valid_value(
ca_certs, 'server', 'ca_certs', None)
if conf.get_valid_value(
do_handshake_on_connect, 'server',
'do_handshake_on_connect', 'True') == 'True':
self.do_handshake_on_connect = True
else:
self.do_handshake_on_connect = False
if conf.get_valid_value(
suppress_ragged_eofs, 'server',
'suppress_ragged_eofs', 'True') == 'True':
self.suppress_ragged_eofs = True
else:
self.suppress_ragged_eofs = False

View File

@ -1,299 +0,0 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import time
from kmip.core.messages.messages import RequestMessage
from kmip.core.messages.messages import ResponseMessage
from kmip.core.messages.messages import ResponseBatchItem
from kmip.core.messages.messages import ResponseHeader
from kmip.core.messages.contents import AsynchronousIndicator
from kmip.core.messages.contents import BatchErrorContinuationOption
from kmip.core.messages.contents import BatchCount
from kmip.core.messages.contents import TimeStamp
from kmip.core.primitives import Base
from kmip.core.messages.payloads.create import CreateResponsePayload
from kmip.core.messages.payloads.get import GetResponsePayload
from kmip.core.messages.payloads.destroy import DestroyResponsePayload
from kmip.core.messages.payloads.register import RegisterResponsePayload
from kmip.core.messages.payloads.locate import LocateResponsePayload
from kmip.core.messages.payloads.discover_versions import \
DiscoverVersionsResponsePayload
from kmip.core.enums import Operation
from kmip.core.enums import ResultStatus as RS
from kmip.core.enums import Tags
from kmip.core.enums import BatchErrorContinuationOption as BECO
from kmip.core.utils import BytearrayStream
class Processor(object):
def __init__(self, handler):
self.logger = logging.getLogger(__name__)
self._handler = handler
def process(self, istream, ostream):
stream = istream.read()
if Base.is_tag_next(Tags.REQUEST_MESSAGE, stream):
message = RequestMessage()
message.read(stream)
try:
result = self._process_request(message)
except Exception as e:
raise e
tstream = BytearrayStream()
result.write(tstream)
ostream.write(tstream.buffer)
elif Base.is_tag_next(Tags.RESPONSE_MESSAGE, stream):
message = ResponseMessage()
message.read(stream)
self._process_response(message)
else:
raise ValueError('Processing error: stream contains unknown '
'message type')
def _process_request(self, message):
header = message.request_header
protocol_version = header.protocol_version
# maximum_response_size = header.maximum_response_size
asynchronous_indicator = header.asynchronous_indicator
# authentication = header.authentication
batch_error_cont_option = header.batch_error_cont_option
# batch_order_option = header.batch_order_option
# time_stamp = header.time_stamp
request_batch_count = header.batch_count.value
# TODO (peter-hamilton) Log receipt of message with time stamp
if asynchronous_indicator is None:
asynchronous_indicator = AsynchronousIndicator(False)
if batch_error_cont_option is None:
batch_error_cont_option = BatchErrorContinuationOption(BECO.STOP)
request_batch_items = message.batch_items
response_batch_items = []
for i in range(request_batch_count):
request_batch_item = request_batch_items[i]
failure_occurred = False
operation = request_batch_item.operation
ubi_id = request_batch_item.unique_batch_item_id
payload = request_batch_item.request_payload
message_extension = request_batch_item.message_extension
result = self._process_operation(operation, payload)
result_status = result[0]
result_reason = result[1]
result_message = result[2]
asyn_cv = None
response_payload = None
message_extension = None
if result_status.value is RS.SUCCESS:
response_payload = result[3]
elif result_status.value is RS.OPERATION_FAILED:
failure_occurred = True
result_reason = result[1]
elif result_status.value is RS.OPERATION_PENDING:
# TODO (peter-hamilton) Need to add a way to track async
# TODO (peter-hamilton) operations.
asyn_cv = b'\x00'
elif result_status.value is RS.OPERATION_UNDONE:
result_reason = result[1]
else:
msg = 'Unrecognized operation result status: {0}'
raise RuntimeError(msg.format(result_status))
resp_bi = ResponseBatchItem(operation=operation,
unique_batch_item_id=ubi_id,
result_status=result_status,
result_reason=result_reason,
result_message=result_message,
async_correlation_value=asyn_cv,
response_payload=response_payload,
message_extension=message_extension)
response_batch_items.append(resp_bi)
if failure_occurred:
if batch_error_cont_option.value is BECO.STOP:
break
elif batch_error_cont_option.value is BECO.UNDO:
# TODO (peter-hamilton) Tell client to undo operations.
# TODO (peter-hamilton) Unclear what response should be.
break
elif batch_error_cont_option.value is BECO.CONTINUE:
continue
else:
msg = 'Unrecognized batch error continuation option: {0}'
raise RuntimeError(msg.format(batch_error_cont_option))
response_batch_count = BatchCount(len(response_batch_items))
response_time_stamp = TimeStamp(int(time.time()))
response_header = ResponseHeader(protocol_version=protocol_version,
time_stamp=response_time_stamp,
batch_count=response_batch_count)
response_message = ResponseMessage(response_header=response_header,
batch_items=response_batch_items)
return response_message
def _process_response(self, message):
raise NotImplementedError()
def _process_operation(self, operation, payload):
op = operation.value
if op is Operation.CREATE:
return self._process_create_request(payload)
elif op is Operation.GET:
return self._process_get_request(payload)
elif op is Operation.DESTROY:
return self._process_destroy_request(payload)
elif op is Operation.REGISTER:
return self._process_register_request(payload)
elif op is Operation.LOCATE:
return self._process_locate_request(payload)
elif op is Operation.DISCOVER_VERSIONS:
return self._process_discover_versions_request(payload)
else:
self.logger.debug("Process operation: Not implemented")
raise NotImplementedError()
def _process_create_request(self, payload):
object_type = payload.object_type
template_attribute = payload.template_attribute
result = self._handler.create(object_type, template_attribute)
result_status = result.result_status
result_reason = result.result_reason
result_message = result.result_message
created_type = result.object_type
uuid = result.uuid
template_attribute = result.template_attribute
resp_pl = CreateResponsePayload(object_type=created_type,
unique_identifier=uuid,
template_attribute=template_attribute)
return (result_status, result_reason, result_message, resp_pl)
def _process_get_request(self, payload):
uuid = None
kft = None
kct = None
unique_identifier = payload.unique_identifier
key_format_type = payload.key_format_type
key_compression_type = payload.key_compression_type
key_wrapping_specification = payload.key_wrapping_specification
if unique_identifier is not None:
uuid = unique_identifier
if key_format_type is not None:
kft = key_format_type
if key_compression_type is not None:
kct = key_compression_type
result = self._handler.get(uuid, kft, kct,
key_wrapping_specification)
result_status = result.result_status
result_reason = result.result_reason
result_message = result.result_message
retrieved_type = result.object_type
uuid = result.uuid
secret = result.secret
resp_pl = GetResponsePayload(object_type=retrieved_type,
unique_identifier=uuid,
secret=secret)
return (result_status, result_reason, result_message, resp_pl)
def _process_destroy_request(self, payload):
uuid = payload.unique_identifier
result = self._handler.destroy(uuid)
result_status = result.result_status
result_reason = result.result_reason
result_message = result.result_message
uuid = result.uuid
payload = DestroyResponsePayload(unique_identifier=uuid)
return (result_status, result_reason, result_message, payload)
def _process_register_request(self, payload):
object_type = payload.object_type
template_attribute = payload.template_attribute
secret = payload.secret
result = self._handler.register(object_type, template_attribute,
secret)
result_status = result.result_status
result_reason = result.result_reason
result_message = result.result_message
uuid = result.uuid
template_attr = result.template_attribute
resp_pl = RegisterResponsePayload(unique_identifier=uuid,
template_attribute=template_attr)
return (result_status, result_reason, result_message, resp_pl)
def _process_locate_request(self, payload):
max_items = payload.maximum_items
storage_mask = payload.storage_status_mask
objgrp_member = payload.object_group_member
attributes = payload.attributes
result = self._handler.locate(max_items, storage_mask,
objgrp_member, attributes)
result_status = result.result_status
result_reason = result.result_reason
result_message = result.result_message
uuids = result.uuids
resp_pl = LocateResponsePayload(unique_identifiers=uuids)
return (result_status, result_reason, result_message, resp_pl)
def _process_discover_versions_request(self, payload):
protocol_versions = payload.protocol_versions
result = self._handler.discover_versions(
protocol_versions=protocol_versions)
result_status = result.result_status
result_reason = result.result_reason
result_protocol_versions = result.protocol_versions
result_message = result.result_message
resp_pl = DiscoverVersionsResponsePayload(
protocol_versions=result_protocol_versions)
return (result_status, result_reason, result_message, resp_pl)

View File

@ -1,14 +0,0 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -1,51 +0,0 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kmip.services.server.repo.repo import ManagedObjectRepo
class MemRepo(ManagedObjectRepo):
def __init__(self):
self.repo = {}
self.uuid = 1
def save(self, managed_object, attributes):
# TODO (nate) verify the parameters
uuid = "{0}".format(self.uuid)
self.repo[uuid] = (managed_object, attributes)
self.uuid += 1
return uuid
def get(self, uuid):
if uuid is None or uuid not in self.repo:
return (None, None)
return self.repo[uuid]
def update(self, uuid, managed_object, attributes):
if uuid is None:
return False
self.repo[uuid] = (managed_object, attributes)
return True
def delete(self, uuid):
if uuid is None or uuid not in self.repo:
return False
del self.repo[uuid]
return True
def locate(self, maximum_items, storage_status_mask,
object_group_member, attributes):
raise NotImplementedError

View File

@ -1,73 +0,0 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class ManagedObjectRepo(object):
"""Stores and manages KMIP managed objects.
The KMIP specification details the managed objects that are stored by a
KMIP server. This repository abstraction is an interface for KMIP servers
to store managed objects.
"""
def __init__(self):
pass
def save(self, managed_object, attributes):
"""Save a managed object
This saves a managed object into the repository and returns a UUID
string that can be used to reference the object in the repository.
:param managed_object: managed object to save from secrets.py
:param attributes: attributes to store with the managed object
:returns: a UUID string that can be used to retrieve the object later
"""
raise NotImplementedError
def get(self, uuid):
"""Retrieve a managed object
Retrieve a managed object from the repository. The UUID is used to
identify the managed object to return. The UUID is returned from the
save call.
A tuple is returned that contains the managed object and all of its
attributes.
:param uuid: UUID of the managed object
:returns: (managed_object, attributes) if object exists, otherwise
(None, None)
"""
raise NotImplementedError
def update(self, uuid, managed_object, attributes):
"""Updates a managed object
Updates the values for a managed_object.
:param uuid: UUID of the managed object
:param managed_object: managed object
:param attributes: attributes to store with the managed object
:returns: True if object existed and successfully updated, otherwise
False
"""
raise NotImplementedError
def delete(self, uuid):
"""Delete a managed object from the repository
Delete a managed object from the repository.
:param uuid: UUID of the managed object
:returns: True if successfully deleted, False if not found
"""
raise NotImplementedError

View File

@ -1,96 +0,0 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import optparse
import os
import sys
from kmip.core.config_helper import ConfigHelper
from kmip.services.server.kmip_server import KMIPServer
FILE_PATH = os.path.dirname(os.path.abspath(__file__))
def run_server(host, port, certfile, keyfile, cert_reqs, ssl_version,
ca_certs, do_handshake_on_connect, suppress_ragged_eofs):
logger = logging.getLogger(__name__)
server = KMIPServer(host=host, port=port, keyfile=keyfile,
certfile=certfile, cert_reqs=cert_reqs,
ssl_version=ssl_version, ca_certs=ca_certs,
do_handshake_on_connect=do_handshake_on_connect,
suppress_ragged_eofs=suppress_ragged_eofs)
logger.info('Starting the KMIP server')
try:
server.serve()
except KeyboardInterrupt:
logger.info('KeyboardInterrupt received while serving')
except Exception as e:
logger.info('Exception received while serving: {0}'.format(e))
finally:
server.close()
logger.info('Shutting down KMIP server')
def build_cli_parser():
parser = optparse.OptionParser(usage="%prog [options]",
description="Run KMIP Server")
parser.add_option("-n", "--host", action="store", default='127.0.0.1',
dest="host",
help="Hostname/IP address of platform running the KMIP "
"server (e.g., localhost, 127.0.0.1)")
parser.add_option("-p", "--port", action="store", default=5696,
dest="port", help="Port number for KMIP services")
parser.add_option("-k", "--keyfile", action="store",
default=os.path.normpath(os.path.join(
FILE_PATH, '../utils/certs/server.key')),
dest="keyfile")
parser.add_option("-c", "--certfile", action="store",
default=os.path.normpath(os.path.join(
FILE_PATH, '../utils/certs/server.crt')),
dest="certfile")
parser.add_option("-r", "--cert_reqs", action="store",
default="CERT_NONE", dest="cert_reqs")
parser.add_option("-s", "--ssl_version", action="store",
default='PROTOCOL_SSLv23', dest="ssl_version")
parser.add_option("-a", "--ca_certs", action="store",
default=ConfigHelper.NONE_VALUE, dest="ca_certs")
parser.add_option("-d", "--do_handshake_on_connect", action="store",
default="True", dest="do_handshake_on_connect")
parser.add_option("-e", "--suppress_ragged_eofs", action="store",
default="True", dest="suppress_ragged_eofs")
return parser
if __name__ == '__main__':
parser = build_cli_parser()
opts, args = parser.parse_args(sys.argv[1:])
run_server(host=opts.host,
port=opts.port,
certfile=opts.certfile,
keyfile=opts.keyfile,
cert_reqs=opts.cert_reqs,
ssl_version=opts.ssl_version,
ca_certs=opts.ca_certs,
do_handshake_on_connect=opts.do_handshake_on_connect,
suppress_ragged_eofs=opts.suppress_ragged_eofs)

View File

@ -1,542 +0,0 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testtools import TestCase
from kmip.core.attributes import CryptographicAlgorithm
from kmip.core.attributes import CryptographicLength
from kmip.core.attributes import CryptographicUsageMask
from kmip.core.attributes import UniqueIdentifier
from kmip.core.attributes import ObjectType
from kmip.core.attributes import Name
from kmip.core.enums import AttributeType
from kmip.core.enums import CryptographicAlgorithm as CryptoAlgorithmEnum
from kmip.core.enums import CryptographicUsageMask as CryptoUsageMaskEnum
from kmip.core.enums import KeyCompressionType as KeyCompressionTypeEnum
from kmip.core.enums import KeyFormatType as KeyFormatTypeEnum
from kmip.core.enums import ObjectType as ObjectTypeEnum
from kmip.core.enums import ResultReason
from kmip.core.enums import ResultStatus
from kmip.core.enums import NameType
from kmip.core.factories.attributes import AttributeFactory
from kmip.core.messages.contents import KeyCompressionType
from kmip.core.misc import KeyFormatType
from kmip.core.objects import KeyBlock
from kmip.core.objects import KeyMaterial
from kmip.core.objects import KeyValue
from kmip.core.objects import TemplateAttribute
from kmip.core.secrets import SymmetricKey
from kmip.core.server import KMIPImpl
class TestKMIPServer(TestCase):
def setUp(self):
super(TestKMIPServer, self).setUp()
self.kmip = KMIPImpl()
self.algorithm_name = CryptoAlgorithmEnum.AES
self.key_length = 256
self.key = bytearray(range(0, 32))
self.usage_mask = CryptoUsageMaskEnum.ENCRYPT.value |\
CryptoUsageMaskEnum.DECRYPT.value
def tearDown(self):
super(TestKMIPServer, self).tearDown()
def test_create(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
attributes = self._get_attrs()
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.create(obj_type, template_attribute)
self.assertNotEqual(None, res, 'result is None')
self.assertEqual(ResultStatus.SUCCESS, res.result_status.value,
'result status did not return success')
def test_create_no_length(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
attributes = self._get_attrs()[0:2]
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.create(obj_type, template_attribute)
self.assertNotEqual(None, res, 'result is None')
attrs = res.template_attribute.attributes
self.assertEqual(ResultStatus.SUCCESS, res.result_status.value,
'result status did not return success')
self.assertTrue(self._check_attr_exists(attributes[2], attrs),
'length attribute not returned')
def test_create_no_alg(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
attributes = [self._get_attrs()[1]]
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.create(obj_type, template_attribute)
self.assertNotEqual(None, res, 'result is None')
self.assertEqual(
ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
def test_create_no_usage_mask(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
attributes = [self._get_attrs()[0]]
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.create(obj_type, template_attribute)
self.assertNotEqual(None, res, 'result is None')
self.assertEqual(
ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
def test_register(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertNotEqual(None, res, 'result is None')
self.assertEqual(ResultStatus.SUCCESS, res.result_status.value,
'result status did not return success')
def test_register_attrs_in_key_value(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
key.key_block.cryptographic_algorithm = None
key.key_block.cryptographic_length = None
key.key_block.key_value.attributes = self._get_attrs()
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertNotEqual(None, res, 'result is None')
self.assertEqual(ResultStatus.SUCCESS, res.result_status.value,
'result status did not return success')
def test_register_attrs_in_template(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
key.key_block.cryptographic_algorithm = None
key.key_block.cryptographic_length = None
key.key_block.key_value.attributes = []
attributes = self._get_attrs()
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertNotEqual(None, res, 'result is None')
self.assertEqual(ResultStatus.SUCCESS, res.result_status.value,
'result status did not return success')
def test_register_no_alg(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
key.key_block.cryptographic_algorithm = None
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.ITEM_NOT_FOUND,
res.result_reason.value,
'result reason did not match')
def test_register_alg_in_key_value_and_key_block(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
key.key_block.key_value.attributes = [self._get_alg_attr()]
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.INDEX_OUT_OF_BOUNDS,
res.result_reason.value,
'result reason did not match')
def test_register_alg_in_template_and_key_block(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
attributes = [self._get_alg_attr()]
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.INDEX_OUT_OF_BOUNDS,
res.result_reason.value,
'result reason did not match')
def test_register_alg_in_template_and_key_value(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
key.key_block.cryptographic_algorithm = None
key.key_block.key_value.attributes = [self._get_alg_attr()]
attributes = [self._get_alg_attr()]
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.INDEX_OUT_OF_BOUNDS,
res.result_reason.value,
'result reason did not match')
def test_register_invalid_alg(self):
unsupported_algs = (CryptoAlgorithmEnum.RSA,
CryptoAlgorithmEnum.DSA,
CryptoAlgorithmEnum.ECDSA,
CryptoAlgorithmEnum.HMAC_SHA1,
CryptoAlgorithmEnum.HMAC_SHA224,
CryptoAlgorithmEnum.HMAC_SHA256,
CryptoAlgorithmEnum.HMAC_SHA384,
CryptoAlgorithmEnum.HMAC_SHA512,
CryptoAlgorithmEnum.HMAC_MD5,
CryptoAlgorithmEnum.DH,
CryptoAlgorithmEnum.ECDH,
CryptoAlgorithmEnum.ECMQV,
CryptoAlgorithmEnum.BLOWFISH,
CryptoAlgorithmEnum.CAMELLIA,
CryptoAlgorithmEnum.CAST5,
CryptoAlgorithmEnum.IDEA,
CryptoAlgorithmEnum.MARS,
CryptoAlgorithmEnum.RC2,
CryptoAlgorithmEnum.RC4,
CryptoAlgorithmEnum.RC5,
CryptoAlgorithmEnum.SKIPJACK,
CryptoAlgorithmEnum.TWOFISH)
for alg in unsupported_algs:
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
key.key_block.cryptographic_algorithm = CryptographicAlgorithm(alg)
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.INVALID_FIELD,
res.result_reason.value,
'result reason did not match')
def test_register_no_length(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
key.key_block.cryptographic_length = None
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.ITEM_NOT_FOUND,
res.result_reason.value,
'result reason did not match')
def test_register_length_in_key_value_and_key_block(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
key.key_block.key_value.attributes = [self._get_length_attr()]
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.INDEX_OUT_OF_BOUNDS,
res.result_reason.value,
'result reason did not match')
def test_register_length_in_template_and_key_block(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
attributes = [self._get_length_attr()]
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.INDEX_OUT_OF_BOUNDS,
res.result_reason.value,
'result reason did not match')
def test_register_length_in_template_and_key_value(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
key.key_block.cryptographic_length = None
key.key_block.key_value.attributes = [self._get_length_attr()]
attributes = [self._get_length_attr()]
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.INDEX_OUT_OF_BOUNDS,
res.result_reason.value,
'result reason did not match')
def test_register_invalid_length(self):
unsupported_lens = (-1, 0, 2048, 5, 18)
for len in unsupported_lens:
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
key.key_block.cryptographic_length = CryptographicLength(len)
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.INVALID_FIELD,
res.result_reason.value,
'result reason did not match')
def test_register_no_usage_mask(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
key = self._get_symmetric_key()
key.key_block.key_value.attributes = []
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.ITEM_NOT_FOUND,
res.result_reason.value,
'result reason did not match')
def test_register_no_object_type(self):
obj_type = None
key = self._get_symmetric_key()
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertNotEqual(None, res, 'result is None')
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.ITEM_NOT_FOUND,
res.result_reason.value,
'result reason did not match')
def test_register_unsupported_object_type(self):
unsupported_types = (ObjectTypeEnum.CERTIFICATE,
ObjectTypeEnum.PUBLIC_KEY,
ObjectTypeEnum.PRIVATE_KEY,
ObjectTypeEnum.SPLIT_KEY,
ObjectTypeEnum.TEMPLATE,
ObjectTypeEnum.SECRET_DATA,
ObjectTypeEnum.OPAQUE_DATA)
for unsupported_type in unsupported_types:
obj_type = ObjectType(unsupported_type)
key = self._get_symmetric_key()
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertNotEqual(None, res, 'result is None')
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.INVALID_FIELD,
res.result_reason.value,
'result reason did not match')
def test_register_object_type_mismatch(self):
unsupported_types = (ObjectTypeEnum.CERTIFICATE,
ObjectTypeEnum.PUBLIC_KEY,
ObjectTypeEnum.PRIVATE_KEY,
ObjectTypeEnum.SPLIT_KEY,
ObjectTypeEnum.TEMPLATE,
ObjectTypeEnum.SECRET_DATA,
ObjectTypeEnum.OPAQUE_DATA)
for unsupported_type in unsupported_types:
obj_type = ObjectType(unsupported_type)
key = self._get_symmetric_key()
attributes = []
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.register(obj_type, template_attribute, key)
self.assertNotEqual(None, res, 'result is None')
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.INVALID_FIELD,
res.result_reason.value,
'result reason did not match')
def test_get(self):
uuid = self._create()
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
res = self.kmip.get(uuid, key_format_type)
self.assertEqual(ResultStatus.SUCCESS, res.result_status.value,
'result status did not return success')
def test_get_no_key_format_type(self):
uuid = self._create()
res = self.kmip.get(uuid, None)
self.assertEqual(ResultStatus.SUCCESS, res.result_status.value,
'result status did not return success')
def test_get_unknown(self):
uuids = ('some random string', UniqueIdentifier('no key here'))
for uuid in uuids:
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
res = self.kmip.get(uuid, key_format_type)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.ITEM_NOT_FOUND,
res.result_reason.value,
'result reason did not match')
def test_get_no_uuid(self):
self._create()
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
res = self.kmip.get(None, key_format_type)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
def test_get_with_key_compression(self):
uuid = self._create()
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
key_compression = KeyCompressionType(KeyCompressionTypeEnum.
EC_PUBLIC_KEY_TYPE_UNCOMPRESSED)
res = self.kmip.get(uuid, key_format_type, key_compression)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.KEY_COMPRESSION_TYPE_NOT_SUPPORTED,
res.result_reason.value,
'result reason did not match')
def test_destroy(self):
uuid = self._create()
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
res = self.kmip.get(uuid, key_format_type)
self.assertEqual(ResultStatus.SUCCESS, res.result_status.value,
'result status did not return success')
res = self.kmip.destroy(uuid)
self.assertEqual(ResultStatus.SUCCESS, res.result_status.value,
'result status did not return success')
res = self.kmip.destroy(uuid)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.ITEM_NOT_FOUND,
res.result_reason.value,
'result reason did not match')
def test_destroy_no_uuid(self):
res = self.kmip.destroy(None)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.ITEM_NOT_FOUND,
res.result_reason.value,
'result reason did not match')
def test_destroy_unknown(self):
uuids = ('some random string', UniqueIdentifier('no key here'))
for uuid in uuids:
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
res = self.kmip.get(uuid, key_format_type)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
res = self.kmip.destroy(uuid)
self.assertEqual(ResultStatus.OPERATION_FAILED,
res.result_status.value,
'result status did not return failed')
self.assertEqual(ResultReason.ITEM_NOT_FOUND,
res.result_reason.value,
'result reason did not match')
def _create(self):
obj_type = ObjectType(ObjectTypeEnum.SYMMETRIC_KEY)
attributes = self._get_attrs()
template_attribute = TemplateAttribute(attributes=attributes)
res = self.kmip.create(obj_type, template_attribute)
self.assertNotEqual(None, res, 'result is None')
self.assertEqual(ResultStatus.SUCCESS, res.result_status.value,
'result status did not return success')
return res.uuid
def _get_symmetric_key(self):
# only need usage attribute
attrs = [self._get_attrs()[1]]
key_format_type = KeyFormatType(KeyFormatTypeEnum.RAW)
key_material = KeyMaterial(self.key)
key_value = KeyValue(key_material, attrs)
crypto_alg = CryptographicAlgorithm(self.algorithm_name)
crypto_length = CryptographicLength(self.key_length)
usage = CryptographicUsageMask(self.usage_mask)
key_block = KeyBlock(key_format_type, None, key_value, crypto_alg,
crypto_length, usage)
return SymmetricKey(key_block)
def _get_attrs(self):
attr_factory = AttributeFactory()
algorithm = self._get_alg_attr(self.algorithm_name)
length = self._get_length_attr(self.key_length)
attribute_type = AttributeType.CRYPTOGRAPHIC_USAGE_MASK
mask_flags = [CryptoUsageMaskEnum.ENCRYPT,
CryptoUsageMaskEnum.DECRYPT]
usage_mask = attr_factory.create_attribute(attribute_type,
mask_flags)
name_value = Name.NameValue(value='TESTNAME')
name_type = Name.NameType(value=NameType.UNINTERPRETED_TEXT_STRING)
value = Name.create(name_value, name_type)
nameattr = attr_factory.create_attribute(AttributeType.NAME, value)
return [algorithm, usage_mask, length, nameattr]
def _get_alg_attr(self, alg=None):
if alg is None:
alg = self.algorithm_name
attr_factory = AttributeFactory()
attribute_type = AttributeType.CRYPTOGRAPHIC_ALGORITHM
return attr_factory.create_attribute(attribute_type, alg)
def _get_length_attr(self, length=None):
if length is None:
length = self.key_length
attr_factory = AttributeFactory()
attribute_type = AttributeType.CRYPTOGRAPHIC_LENGTH
return attr_factory.create_attribute(attribute_type, length)
def _check_attr_exists(self, attr_expected, attributes):
for attribute in attributes:
if attribute.attribute_name.value ==\
attr_expected.attribute_name.value:
return attribute.attribute_value.value ==\
attr_expected.attribute_value.value
return False
def test_locate(self):
self._create()
name_value = Name.NameValue(value='TESTNAME')
name_type = Name.NameType(value=NameType.UNINTERPRETED_TEXT_STRING)
value = Name.create(name_value, name_type)
attr_factory = AttributeFactory()
nameattr = attr_factory.create_attribute(AttributeType.NAME, value)
attrs = [nameattr]
res = self.kmip.locate(attributes=attrs)
self.assertEqual(
ResultStatus.OPERATION_FAILED,
res.result_status.value,
'locate result status did not return success')

View File

@ -1,169 +0,0 @@
# Copyright (c) 2016 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from binascii import unhexlify as hex2bin
from kmip.core.server import KMIPImpl
from kmip.services.server.processor import Processor
from kmip.services.server.kmip_protocol import KMIPProtocolFactory
class TestServerProcessor(testtools.TestCase):
"""
Integration test suite for the Kmip Server Processor.
"""
def _setTimeStamp(self, data, ref):
"""
Take raw TimeStamp data from 'ref' and update with it
the TimeStamp in 'data'.
Applied to the data blob sent by processor before comparing it
with the reference response data.
"""
time_stamp_pattern = hex2bin('4200920900000008')
try:
idx_data = data.index(time_stamp_pattern)
idx_ref = ref.index(time_stamp_pattern)
except Exception:
return data
data = data[:idx_data] + ref[idx_ref:idx_ref+16] + data[idx_data+16:]
return data
def setUp(self):
super(TestServerProcessor, self).setUp()
def tearDown(self):
super(TestServerProcessor, self).tearDown()
def test_init(self):
"""
Test that a Kmip Server Processor can be created without errors.
"""
handler = KMIPImpl()
Processor(handler)
def _integration_test_process(self, request, expected):
"""
Semi-integration test of the Request processing;
includes the encode/decode of messages and operation specific
Payloads.
WF is mocked on the socket level:
- request blob data are supplied to socket in two chunks:
-- Request tag/type/length;
-- Request Message data.
- data sent by socket are restored from the mock-calls and compared
with expected data (excluding the TimeStamp data -- TODO).
"""
socket = mock.MagicMock()
socket.recv = mock.MagicMock(side_effect=[request[:8], request[8:]])
socket.sendall = mock.MagicMock()
factory = KMIPProtocolFactory()
protocol = factory.getProtocol(socket)
handler = KMIPImpl()
processor = Processor(handler)
processor.process(protocol, protocol)
(args, kwargs) = socket.sendall.call_args
to_cmp = self._setTimeStamp(args[0], expected)
self.assertEqual(expected, to_cmp, "Unexpected error")
def test_process_discovery_versions(self):
"""
'DiscoveryVersion':
- request:
-- header with protocol version and credential authentication;
-- batch item 'Operation DiscoverVersions' without parameters
- expected:
-- response header with procotol-version, time stamp, batch count
-- batch item with two versions supported by server
"""
request = hex2bin(
'42007801000000b04200770100000088420069010000002042006a02000000040'
'00000010000000042006b0200000004000000010000000042000c010000004842'
'00230100000040420024050000000400000001000000004200250100000028420'
'099070000000a4b6d6970436c69656e740000000000004200a10700000006436f'
'75636f75000042000d0200000004000000010000000042000f010000001842005'
'c05000000040000001e000000004200790100000000')
response = hex2bin(
'42007b01000000d042007a0100000048420069010000002042006a02000000040'
'00000010000000042006b02000000040000000100000000420092090000000800'
'00000056bda8eb42000d0200000004000000010000000042000f0100000078420'
'05c05000000040000001e0000000042007f050000000400000000000000004200'
'7c0100000050420069010000002042006a0200000004000000010000000042006'
'b02000000040000000100000000420069010000002042006a0200000004000000'
'010000000042006b02000000040000000000000000')
self._integration_test_process(request, response)
def test_process_create_symmetric_key(self):
"""
'Create':
- request:
-- header with protocol version and credential authentication;
-- batch item 'Operation Create':
-- object type : symmetric key
-- attributes:
-- Cryptographic Algorithm: AES
-- Cryptographic Usage Mask
-- Cryptographic Length: 128
-- Name
- expected:
-- response header with procotol-version, time stamp, batch count
-- batch item with:
-- operation: Create;
-- result status: Success;
-- response payload:
-- object type: symmetric key;
-- UID: '1';
-- attribute template:
-- 'Unique Identifier': '1'
"""
request = hex2bin(
'42007801000001b04200770100000088420069010000002042006a02000000040'
'00000010000000042006b0200000004000000010000000042000c010000004842'
'00230100000040420024050000000400000001000000004200250100000028420'
'099070000000a4b6d6970436c69656e740000000000004200a10700000006436f'
'75636f75000042000d0200000004000000010000000042000f010000011842005'
'c0500000004000000010000000042007901000001004200570500000004000000'
'020000000042009101000000e8420008010000003042000a07000000174372797'
'0746f6772617068696320416c676f726974686d0042000b050000000400000003'
'00000000420008010000003042000a070000001843727970746f6772617068696'
'3205573616765204d61736b42000b02000000040000000c000000004200080100'
'00003042000a070000001443727970746f67726170686963204c656e677468000'
'0000042000b02000000040000008000000000420008010000003842000a070000'
'00044e616d650000000042000b0100000020420055070000000854657374204b6'
'57942005405000000040000000100000000')
response = hex2bin(
'42007b01000000e042007a0100000048420069010000002042006a02000000040'
'00000010000000042006b02000000040000000100000000420092090000000800'
'00000056c488d742000d0200000004000000010000000042000f0100000088420'
'05c0500000004000000010000000042007f050000000400000000000000004200'
'7c010000006042005705000000040000000200000000420094070000000131000'
'000000000004200910100000038420008010000003042000a0700000011556e69'
'717565204964656e7469666965720000000000000042000b07000000013100000'
'000000000')
self._integration_test_process(request, response)