Merge pull request #74 from OpenKMIP/feat/add-get-attribute-list-pie

Adding ProxyKmipClient support for the GetAttributeList operation
This commit is contained in:
Peter Hamilton 2015-09-04 10:27:10 -04:00
commit 428633f175
6 changed files with 199 additions and 0 deletions

View File

@ -0,0 +1,53 @@
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import sys
from kmip.core import enums
from kmip.demos import utils
from kmip.pie import client
if __name__ == '__main__':
# Build and parse arguments
parser = utils.build_cli_parser(enums.Operation.GET_ATTRIBUTE_LIST)
opts, args = parser.parse_args(sys.argv[1:])
config = opts.config
uid = opts.uuid
# Exit early if the UUID is not specified
if uid is None:
logging.debug('No ID provided, exiting early from demo')
sys.exit()
# Build and setup logging and needed factories
f_log = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,
'logconfig.ini')
logging.config.fileConfig(f_log)
logger = logging.getLogger(__name__)
# Build the client and connect to the server
with client.ProxyKmipClient(config=config) as client:
try:
attribute_names = client.get_attribute_list(uid)
logger.info("Successfully retrieved {0} attribute names:".format(
len(attribute_names)))
for attribute_name in attribute_names:
logger.info("Attribute name: {0}".format(attribute_name))
except Exception as e:
logger.error(e)

View File

@ -161,6 +161,15 @@ def build_cli_parser(operation=None):
dest="format",
help=("Format in which to retrieve the secret. Supported formats "
"include: RAW, PKCS_1, PKCS_8, X_509"))
elif operation is Operation.GET_ATTRIBUTE_LIST:
parser.add_option(
"-i",
"--uuid",
action="store",
type="str",
default=None,
dest="uuid",
help="UID of a managed object")
elif operation is Operation.LOCATE:
parser.add_option(
"-n",

View File

@ -70,6 +70,17 @@ class KmipClient:
"""
pass
@abc.abstractmethod
def get_attribute_list(self, uid):
"""
Get a list of attribute names for a managed object on a KMIP appliance.
Args:
uid (string): The unique ID of the managed object whose attribute
names should be retrieved.
"""
pass
@abc.abstractmethod
def destroy(self, uid):
"""

View File

@ -309,6 +309,39 @@ class ProxyKmipClient(api.KmipClient):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def get_attribute_list(self, uid=None):
"""
Get the names of the attributes associated with a managed object.
If the uid is not specified, the appliance will use the ID placeholder
by default.
Args:
uid (string): The unique ID of the managed object with which the
retrieved attribute names should be associated. Optional,
defaults to None.
"""
# Check input
if uid is not None:
if not isinstance(uid, six.string_types):
raise TypeError("uid must be a string")
# Verify that operations can be given at this time
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
# Get the list of attribute names for a managed object.
result = self.proxy.get_attribute_list(uid)
status = result.result_status.enum
if status == enums.ResultStatus.SUCCESS:
attribute_names = sorted(result.names)
return attribute_names
else:
reason = result.result_reason.enum
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def destroy(self, uid):
"""
Destroy a managed object stored by a KMIP appliance.

View File

@ -38,6 +38,9 @@ class DummyKmipClient(api.KmipClient):
def get(self, uid, *args, **kwargs):
super(DummyKmipClient, self).get(uid)
def get_attribute_list(self, uid, *args, **kwargs):
super(DummyKmipClient, self).get_attribute_list(uid)
def destroy(self, uid):
super(DummyKmipClient, self).destroy(uid)
@ -90,6 +93,13 @@ class TestKmipClient(testtools.TestCase):
dummy = DummyKmipClient()
dummy.get('uid')
def test_get_attribute_list(self):
"""
Test that the get_attribute_list method can be called without error.
"""
dummy = DummyKmipClient()
dummy.get_attribute_list('uid')
def test_destroy(self):
"""
Test that the destroy method can be called without error.

View File

@ -450,6 +450,89 @@ class TestProxyKmipClient(testtools.TestCase):
self.assertRaisesRegexp(
KmipOperationFailure, error_msg, client.get, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_attribute_list(self):
"""
Test that the attribute names of a managed object can be retrieved
with proper input.
"""
uid = 'b4faee10-aa2a-4446-8ad4-0881f3422959'
attribute_names = [
'Cryptographic Length',
'Cryptographic Algorithm',
'State',
'Digest',
'Lease Time',
'Initial Date',
'Unique Identifier',
'Name',
'Cryptographic Usage Mask',
'Object Type',
'Contact Information',
'Last Change Date']
result = results.GetAttributeListResult(
contents.ResultStatus(enums.ResultStatus.SUCCESS),
uid=uid,
names=attribute_names)
with ProxyKmipClient() as client:
client.proxy.get_attribute_list.return_value = result
result = client.get_attribute_list(uid)
client.proxy.get_attribute_list.assert_called_with(uid)
self.assertIsInstance(result, list)
self.assertItemsEqual(attribute_names, result)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_attribute_list_on_invalid_uid(self):
"""
Test that a TypeError exception is raised when trying to retrieve the
attribute names of a managed object with an invalid ID.
"""
args = [0]
with ProxyKmipClient() as client:
self.assertRaises(TypeError, client.get_attribute_list, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_attribute_list_on_closed(self):
"""
Test that a ClientConnectionNotOpen exception is raised when trying
to retrieve the attribute names of a managed object on an unopened
client connection.
"""
client = ProxyKmipClient()
args = ['aaaaaaaa-1111-2222-3333-ffffffffffff']
self.assertRaises(
ClientConnectionNotOpen, client.get_attribute_list, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_attribute_list_on_operation_failure(self):
"""
Test that a KmipOperationFailure exception is raised when the
backend fails to retrieve the attribute names of a managed object.
"""
status = enums.ResultStatus.OPERATION_FAILED
reason = enums.ResultReason.GENERAL_FAILURE
message = "Test failure message"
result = results.OperationResult(
contents.ResultStatus(status),
contents.ResultReason(reason),
contents.ResultMessage(message))
error_msg = str(KmipOperationFailure(status, reason, message))
client = ProxyKmipClient()
client.open()
client.proxy.get_attribute_list.return_value = result
args = ['id']
self.assertRaisesRegexp(
KmipOperationFailure, error_msg, client.get_attribute_list, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_destroy(self):