Merge pull request #263 from vbnmmnbv/locate_pie

Add Locate operation support for pie client
This commit is contained in:
Peter Hamilton 2017-02-15 10:40:15 -05:00 committed by GitHub
commit cbcb5b97bf
3 changed files with 340 additions and 1 deletions

61
kmip/demos/pie/locate.py Normal file
View File

@ -0,0 +1,61 @@
# Copyright (c) 2017 Pure Storage, Inc. All Rights Reserved.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core.enums import NameType
from kmip.core.enums import Operation
from kmip.core.attributes import Name
from kmip.core.objects import Attribute
from kmip.demos import utils
from kmip.pie import client
import logging
import sys
if __name__ == '__main__':
logger = utils.build_console_logger(logging.INFO)
# Build and parse arguments
parser = utils.build_cli_parser(Operation.LOCATE)
opts, args = parser.parse_args(sys.argv[1:])
config = opts.config
name = opts.name
# Exit early if name is not specified
if name is None:
logger.error('No name provided, exiting early from demo')
sys.exit()
# Build name attribute
# TODO Push this into the AttributeFactory
attribute_name = Attribute.AttributeName('Name')
name_value = Name.NameValue(name)
name_type = Name.NameType(NameType.UNINTERPRETED_TEXT_STRING)
value = Name.create(name_value=name_value, name_type=name_type)
name_obj = Attribute(attribute_name=attribute_name, attribute_value=value)
attributes = [name_obj]
# Build the client and connect to the server
with client.ProxyKmipClient(config=config) as client:
try:
uuids = client.locate(attributes=attributes)
logger.info("Located uuids: {0}".format(uuids))
except Exception as e:
logger.error(e)

View File

@ -330,6 +330,67 @@ class ProxyKmipClient(api.KmipClient):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def locate(self, maximum_items=None, storage_status_mask=None,
object_group_member=None, attributes=None):
"""
Search for managed objects, depending on the attributes specified in
the request.
Args:
maximum_items (integer): Maximum number of object identifiers the
server MAY return.
storage_status_mask (integer): A bit mask that indicates whether
on-line or archived objects are to be searched.
object_group_member (ObjectGroupMember): An enumeration that
indicates the object group member type.
attributes (list): Attributes the are REQUIRED to match those in a
candidate object.
Returns:
list: The Unique Identifiers of the located objects
Raises:
ClientConnectionNotOpen: if the client connection is unusable
KmipOperationFailure: if the operation result is a failure
TypeError: if the input arguments are invalid
"""
# Check inputs
if maximum_items is not None:
if not isinstance(maximum_items, six.integer_types):
raise TypeError("maximum_items must be an integer")
if storage_status_mask is not None:
if not isinstance(storage_status_mask, six.integer_types):
raise TypeError("storage_status_mask must be an integer")
if object_group_member is not None:
if not isinstance(object_group_member, enums.ObjectGroupMember):
raise TypeError(
"object_group_member must be a ObjectGroupMember"
"enumeration")
if attributes is not None:
if not isinstance(attributes, list) or \
all(isinstance(item, cobjects.Attribute)
for item in attributes) is False:
raise TypeError(
"attributes must be a list of attributes")
# Verify that operations can be given at this time
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
# Search for managed objects and handle the results
result = self.proxy.locate(
maximum_items, storage_status_mask,
object_group_member, attributes)
status = result.result_status.value
if status == enums.ResultStatus.SUCCESS:
uids = [uuid.value for uuid in result.uuids]
return uids
else:
reason = result.result_reason.value
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def get(self, uid):
"""
Get a managed object from a KMIP appliance.
@ -518,7 +579,7 @@ class ProxyKmipClient(api.KmipClient):
parameters_attribute = CryptographicParameters(
cryptographic_algorithm=CryptographicAlgorithm(algorithm))
# Create the symmetric key and handle the results
# Get the message authentication code and handle the results
result = self.proxy.mac(uid, parameters_attribute, data)
status = result.result_status.value

View File

@ -1158,3 +1158,220 @@ class TestProxyKmipClient(testtools.TestCase):
args = [uuid, algorithm, data]
self.assertRaises(
ClientConnectionNotOpen, client.mac, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_locate(self):
"""
Test the locate client with proper input.
"""
maximum_items = 10
storage_status_mask = 1
object_group_member = enums.ObjectGroupMember.GROUP_MEMBER_FRESH
attributes = [
obj.Attribute(
attribute_name=obj.Attribute.AttributeName('Name'),
attribute_index=obj.Attribute.AttributeIndex(0),
attribute_value=attr.Name(
name_value=attr.Name.NameValue('Test Name'),
name_type=attr.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
),
obj.Attribute(
attribute_name=obj.Attribute.AttributeName('Object Type'),
attribute_value=attr.ObjectType(
enums.ObjectType.SYMMETRIC_KEY
)
)
]
uuid0 = 'aaaaaaaa-1111-2222-3333-ffffffffffff'
uuid1 = 'bbbbbbbb-4444-5555-6666-gggggggggggg'
unique_identifiers = [attr.UniqueIdentifier(uuid0),
attr.UniqueIdentifier(uuid1)]
result = results.LocateResult(
contents.ResultStatus(enums.ResultStatus.SUCCESS),
uuids=unique_identifiers)
with ProxyKmipClient() as client:
client.proxy.locate.return_value = result
uuids = client.locate(
maximum_items, storage_status_mask,
object_group_member, attributes)
self.assertIn(uuid0, uuids)
self.assertIn(uuid1, uuids)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_locate_on_invalid_inputs(self):
"""
Test that a TypeError exception is raised when wrong type
of arguments are given to locate operation.
"""
maximum_items = 10
maximum_items_invalid = "10"
storage_status_mask = 1
storage_status_mask_invalid = '1'
object_group_member = enums.ObjectGroupMember.GROUP_MEMBER_FRESH
object_group_member_invalid = \
enums.CryptographicUsageMask.MAC_GENERATE
attributes = [
obj.Attribute(
attribute_name=obj.Attribute.AttributeName('Name'),
attribute_index=obj.Attribute.AttributeIndex(0),
attribute_value=attr.Name(
name_value=attr.Name.NameValue('Test Name'),
name_type=attr.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
),
obj.Attribute(
attribute_name=obj.Attribute.AttributeName('Object Type'),
attribute_value=attr.ObjectType(
enums.ObjectType.SYMMETRIC_KEY
)
)
]
attributes_invalid0 = 123
attributes_invalid1 = [
obj.Attribute(
attribute_name=obj.Attribute.AttributeName('Name'),
attribute_index=obj.Attribute.AttributeIndex(0),
attribute_value=attr.Name(
name_value=attr.Name.NameValue('Test Name'),
name_type=attr.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
),
123
]
uuid0 = 'aaaaaaaa-1111-2222-3333-ffffffffffff'
uuid1 = 'bbbbbbbb-4444-5555-6666-gggggggggggg'
unique_identifiers = [attr.UniqueIdentifier(uuid0),
attr.UniqueIdentifier(uuid1)]
result = results.LocateResult(
contents.ResultStatus(enums.ResultStatus.SUCCESS),
uuids=unique_identifiers)
args = [maximum_items_invalid, storage_status_mask,
object_group_member, attributes]
with ProxyKmipClient() as client:
client.proxy.locate.return_value = result
self.assertRaises(TypeError, client.locate, *args)
args = [maximum_items, storage_status_mask_invalid,
object_group_member, attributes]
with ProxyKmipClient() as client:
client.proxy.locate.return_value = result
self.assertRaises(TypeError, client.locate, *args)
args = [maximum_items, storage_status_mask,
object_group_member_invalid, attributes]
with ProxyKmipClient() as client:
client.proxy.locate.return_value = result
self.assertRaises(TypeError, client.locate, *args)
args = [maximum_items, storage_status_mask,
object_group_member, attributes_invalid0]
with ProxyKmipClient() as client:
client.proxy.locate.return_value = result
self.assertRaises(TypeError, client.locate, *args)
args = [maximum_items, storage_status_mask,
object_group_member, attributes_invalid1]
with ProxyKmipClient() as client:
client.proxy.locate.return_value = result
self.assertRaises(TypeError, client.locate, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_locate_on_operation_failure(self):
"""
Test that a KmipOperationFailure exception is raised when the
backend fails to locate.
"""
maximum_items = 10
storage_status_mask = 1
object_group_member = enums.ObjectGroupMember.GROUP_MEMBER_FRESH
attributes = [
obj.Attribute(
attribute_name=obj.Attribute.AttributeName('Name'),
attribute_index=obj.Attribute.AttributeIndex(0),
attribute_value=attr.Name(
name_value=attr.Name.NameValue('Test Name'),
name_type=attr.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
),
obj.Attribute(
attribute_name=obj.Attribute.AttributeName('Object Type'),
attribute_value=attr.ObjectType(
enums.ObjectType.SYMMETRIC_KEY
)
)
]
status = enums.ResultStatus.OPERATION_FAILED
reason = enums.ResultReason.GENERAL_FAILURE
message = "Test failure message"
result = results.OperationResult(
contents.ResultStatus(status),
contents.ResultReason(reason),
contents.ResultMessage(message))
error_msg = str(KmipOperationFailure(status, reason, message))
client = ProxyKmipClient()
client.open()
client.proxy.locate.return_value = result
args = [maximum_items, storage_status_mask,
object_group_member, attributes]
self.assertRaisesRegexp(
KmipOperationFailure, error_msg, client.locate, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_locate_on_closed(self):
"""
Test that a ClientConnectionNotOpen exception is raised when trying
to do locate on an unopened client connection.
"""
client = ProxyKmipClient()
maximum_items = 10
storage_status_mask = 1
object_group_member = enums.ObjectGroupMember.GROUP_MEMBER_FRESH
attributes = [
obj.Attribute(
attribute_name=obj.Attribute.AttributeName('Name'),
attribute_index=obj.Attribute.AttributeIndex(0),
attribute_value=attr.Name(
name_value=attr.Name.NameValue('Test Name'),
name_type=attr.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
),
obj.Attribute(
attribute_name=obj.Attribute.AttributeName('Object Type'),
attribute_value=attr.ObjectType(
enums.ObjectType.SYMMETRIC_KEY
)
)
]
args = [maximum_items, storage_status_mask,
object_group_member, attributes]
self.assertRaises(
ClientConnectionNotOpen, client.locate, *args)