Adding server support for the GetAttributes operation

This change adds server support for the GetAttributes operation.
The user can specify an object ID and an attribute list to get
the values of the attributes listed from the specified object.
The user can also omit either argument; the server will default
to using the ID placeholder and all viable attributes respectively.
Only a subset of the standard attributes are supported right now.
New tests have been added to cover the new feature.

Closes #185
Closes #211
Closes #218
This commit is contained in:
Peter Hamilton 2016-12-01 18:21:07 -05:00
parent c92579d50a
commit c59cfeeef9
7 changed files with 991 additions and 193 deletions

View File

@ -31,19 +31,19 @@ class GetAttributesRequestPayload(primitives.Struct):
duplicates in the attribute name list.
Attributes:
uid: The unique ID of the managed object with which the retrieved
attributes should be associated.
unique_identifier: The unique ID of the managed object with which the
retrieved attributes should be associated.
attribute_names: A list of strings identifying the names of the
attributes associated with the managed object.
"""
def __init__(self, uid=None, attribute_names=None):
def __init__(self, unique_identifier=None, attribute_names=None):
"""
Construct a GetAttributes request payload.
Args:
uid (string): The ID of the managed object with which the
retrieved attributes should be associated. Optional, defaults
to None.
unique_identifier (string): The ID of the managed object with
which the retrieved attributes should be associated. Optional,
defaults to None.
attribute_names: A list of strings identifying the names of the
attributes associated with the managed object. Optional,
defaults to None.
@ -51,30 +51,30 @@ class GetAttributesRequestPayload(primitives.Struct):
super(GetAttributesRequestPayload, self).__init__(
enums.Tags.REQUEST_PAYLOAD)
self._uid = None
self._unique_identifier = None
self._attribute_names = list()
self.uid = uid
self.unique_identifier = unique_identifier
self.attribute_names = attribute_names
@property
def uid(self):
if self._uid:
return self._uid.value
def unique_identifier(self):
if self._unique_identifier:
return self._unique_identifier.value
else:
return self._uid
return self._unique_identifier
@uid.setter
def uid(self, value):
@unique_identifier.setter
def unique_identifier(self, value):
if value is None:
self._uid = None
self._unique_identifier = None
elif isinstance(value, six.string_types):
self._uid = primitives.TextString(
self._unique_identifier = primitives.TextString(
value=value,
tag=enums.Tags.UNIQUE_IDENTIFIER
)
else:
raise TypeError("uid must be a string")
raise TypeError("unique identifier must be a string")
@property
def attribute_names(self):
@ -125,10 +125,12 @@ class GetAttributesRequestPayload(primitives.Struct):
tstream = utils.BytearrayStream(istream.read(self.length))
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream):
self._uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER)
self._uid.read(tstream)
self._unique_identifier = primitives.TextString(
tag=enums.Tags.UNIQUE_IDENTIFIER
)
self._unique_identifier.read(tstream)
else:
self._uid = None
self._unique_identifier = None
names = list()
while self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream):
@ -150,8 +152,8 @@ class GetAttributesRequestPayload(primitives.Struct):
"""
tstream = utils.BytearrayStream()
if self._uid:
self._uid.write(tstream)
if self._unique_identifier:
self._unique_identifier.write(tstream)
for attribute_name in self._attribute_names:
attribute_name.write(tstream)
@ -161,22 +163,24 @@ class GetAttributesRequestPayload(primitives.Struct):
ostream.write(tstream.buffer)
def __repr__(self):
uid = "uid={0}".format(self.uid)
unique_identifier = "unique_identifier={0}".format(
self.unique_identifier
)
attribute_names = "attribute_names={0}".format(self.attribute_names)
return "GetAttributesRequestPayload({0}, {1})".format(
uid,
unique_identifier,
attribute_names
)
def __str__(self):
return str({
'uid': self.uid,
'unique_identifier': self.unique_identifier,
'attribute_names': self.attribute_names
})
def __eq__(self, other):
if isinstance(other, GetAttributesRequestPayload):
if self.uid == other.uid:
if self.unique_identifier == other.unique_identifier:
if set(self.attribute_names) == set(other.attribute_names):
return True
else:
@ -202,49 +206,49 @@ class GetAttributesResponsePayload(primitives.Struct):
associated with the aforementioned managed object.
Attributes:
uid: The unique ID of the managed object with which the retrieved
attributes should be associated.
unique_identifier: The unique ID of the managed object with which
the retrieved attributes should be associated.
attributes: The list of attributes associated with managed object
identified by the uid above.
identified by the unique identifier above.
"""
def __init__(self, uid=None, attributes=None):
def __init__(self, unique_identifier=None, attributes=None):
"""
Construct a GetAttributes response payload.
Args:
uid (string): The ID of the managed object with which the
retrieved attributes should be associated. Optional, defaults
to None.
unique_identifier (string): The ID of the managed object with
which the retrieved attributes should be associated. Optional,
defaults to None.
attributes (list): A list of attribute structures associated with
the managed object. Optional, defaults to None.
"""
super(GetAttributesResponsePayload, self).__init__(
enums.Tags.RESPONSE_PAYLOAD)
self._uid = None
self._unique_identifier = None
self._attributes = list()
self.uid = uid
self.unique_identifier = unique_identifier
self.attributes = attributes
@property
def uid(self):
if self._uid:
return self._uid.value
def unique_identifier(self):
if self._unique_identifier:
return self._unique_identifier.value
else:
return self._uid
return self._unique_identifier
@uid.setter
def uid(self, value):
@unique_identifier.setter
def unique_identifier(self, value):
if value is None:
self._uid = None
self._unique_identifier = None
elif isinstance(value, six.string_types):
self._uid = primitives.TextString(
self._unique_identifier = primitives.TextString(
value=value,
tag=enums.Tags.UNIQUE_IDENTIFIER
)
else:
raise TypeError("uid must be a string")
raise TypeError("unique identifier must be a string")
@property
def attributes(self):
@ -279,12 +283,14 @@ class GetAttributesResponsePayload(primitives.Struct):
tstream = utils.BytearrayStream(istream.read(self.length))
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream):
uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER)
uid.read(tstream)
self.uid = uid.value
unique_identifier = primitives.TextString(
tag=enums.Tags.UNIQUE_IDENTIFIER
)
unique_identifier.read(tstream)
self.unique_identifier = unique_identifier.value
else:
raise exceptions.InvalidKmipEncoding(
"expected GetAttributes response uid not found"
"expected GetAttributes response unique identifier not found"
)
self._attributes = list()
@ -306,11 +312,11 @@ class GetAttributesResponsePayload(primitives.Struct):
"""
tstream = utils.BytearrayStream()
if self._uid:
self._uid.write(tstream)
if self._unique_identifier:
self._unique_identifier.write(tstream)
else:
raise exceptions.InvalidField(
"The GetAttributes response uid is required."
"The GetAttributes response unique identifier is required."
)
for attribute in self._attributes:
@ -321,16 +327,24 @@ class GetAttributesResponsePayload(primitives.Struct):
ostream.write(tstream.buffer)
def __repr__(self):
uid = "uid={0}".format(self.uid)
unique_identifier = "unique_identifier={0}".format(
self.unique_identifier
)
names = "attributes={0}".format(self.attributes)
return "GetAttributesResponsePayload({0}, {1})".format(uid, names)
return "GetAttributesResponsePayload({0}, {1})".format(
unique_identifier,
names
)
def __str__(self):
return str({'uid': self.uid, 'attributes': self.attributes})
return str({
'unique_identifier': self.unique_identifier,
'attributes': self.attributes
})
def __eq__(self, other):
if isinstance(other, GetAttributesResponsePayload):
if self.uid != other.uid:
if self.unique_identifier != other.unique_identifier:
return False
if len(self._attributes) != len(other._attributes):
return False

View File

@ -825,11 +825,10 @@ class TextString(Base):
def __validate(self):
if self.value is not None:
data_type = type(self.value)
if data_type is not str:
if not isinstance(self.value, six.string_types):
msg = ErrorStrings.BAD_EXP_RECV
raise TypeError(msg.format('TextString', 'value', str,
data_type))
type(self.value)))
def __repr__(self):
return "{0}(value={1})".format(type(self).__name__, repr(self.value))

View File

@ -29,6 +29,8 @@ import kmip
from kmip.core import attributes
from kmip.core import enums
from kmip.core import exceptions
from kmip.core.factories import attributes as attribute_factory
from kmip.core.factories import secrets
from kmip.core.messages import contents
@ -40,6 +42,7 @@ from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import get_attributes
from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import register
@ -492,7 +495,6 @@ class KmipEngine(object):
unique_identifier
)
)
self._logger.exception(e)
raise exceptions.ItemNotFound(
"Could not locate object: {0}".format(unique_identifier)
)
@ -624,6 +626,148 @@ class KmipEngine(object):
return attributes
def _get_attributes_from_managed_object(self, managed_object, attr_names):
"""
Given a kmip.pie object and a list of attribute names, attempt to get
all of the existing attribute values from the object.
"""
attr_factory = attribute_factory.AttributeFactory()
retrieved_attributes = list()
if not attr_names:
attr_names = self._attribute_policy.get_all_attribute_names()
for attribute_name in attr_names:
object_type = managed_object._object_type
if not self._attribute_policy.is_attribute_supported(
attribute_name
):
continue
if self._attribute_policy.is_attribute_applicable_to_object_type(
attribute_name,
object_type
):
attribute_value = self._get_attribute_from_managed_object(
managed_object,
attribute_name
)
if attribute_value is not None:
if self._attribute_policy.is_attribute_multivalued(
attribute_name
):
for count, value in enumerate(attribute_value):
attribute = attr_factory.create_attribute(
enums.AttributeType(attribute_name),
value,
count
)
retrieved_attributes.append(attribute)
else:
attribute = attr_factory.create_attribute(
enums.AttributeType(attribute_name),
attribute_value
)
retrieved_attributes.append(attribute)
return retrieved_attributes
def _get_attribute_from_managed_object(self, managed_object, attr_name):
"""
Get the attribute value from the kmip.pie managed object.
"""
if attr_name == 'Unique Identifier':
return str(managed_object.unique_identifier)
elif attr_name == 'Name':
names = list()
for name in managed_object.names:
name = attributes.Name(
attributes.Name.NameValue(name),
attributes.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
names.append(name)
return names
elif attr_name == 'Object Type':
return managed_object._object_type
elif attr_name == 'Cryptographic Algorithm':
return managed_object.cryptographic_algorithm
elif attr_name == 'Cryptographic Length':
return managed_object.cryptographic_length
elif attr_name == 'Cryptographic Parameters':
return None
elif attr_name == 'Cryptographic Domain Parameters':
return None
elif attr_name == 'Certificate Type':
return managed_object.certificate_type
elif attr_name == 'Certificate Length':
return None
elif attr_name == 'X.509 Certificate Identifier':
return None
elif attr_name == 'X.509 Certificate Subject':
return None
elif attr_name == 'X.509 Certificate Issuer':
return None
elif attr_name == 'Certificate Identifier':
return None
elif attr_name == 'Certificate Subject':
return None
elif attr_name == 'Certificate Issuer':
return None
elif attr_name == 'Digital Signature Algorithm':
return None
elif attr_name == 'Digest':
return None
elif attr_name == 'Operation Policy Name':
return managed_object.operation_policy_name
elif attr_name == 'Cryptographic Usage Mask':
return managed_object.cryptographic_usage_masks
elif attr_name == 'Lease Time':
return None
elif attr_name == 'Usage Limits':
return None
elif attr_name == 'State':
return managed_object.state
elif attr_name == 'Initial Date':
return None
elif attr_name == 'Activation Date':
return None
elif attr_name == 'Process Start Date':
return None
elif attr_name == 'Protect Stop Date':
return None
elif attr_name == 'Deactivation Date':
return None
elif attr_name == 'Destroy Date':
return None
elif attr_name == 'Compromise Occurrence Date':
return None
elif attr_name == 'Compromise Date':
return None
elif attr_name == 'Revocation Reason':
return None
elif attr_name == 'Archive Date':
return None
elif attr_name == 'Object Group':
return None
elif attr_name == 'Fresh':
return None
elif attr_name == 'Link':
return None
elif attr_name == 'Application Specific Information':
return None
elif attr_name == 'Contact Information':
return None
elif attr_name == 'Last Change Date':
return None
else:
# Since custom attribute names are possible, just return None
# for unrecognized attributes. This satisfies the spec.
return None
def _set_attributes_on_managed_object(self, managed_object, attributes):
"""
Given a kmip.pie object and a dictionary of attributes, attempt to set
@ -761,6 +905,8 @@ class KmipEngine(object):
return self._process_register(payload)
elif operation == enums.Operation.GET:
return self._process_get(payload)
elif operation == enums.Operation.GET_ATTRIBUTES:
return self._process_get_attributes(payload)
elif operation == enums.Operation.DESTROY:
return self._process_destroy(payload)
elif operation == enums.Operation.QUERY:
@ -1187,6 +1333,47 @@ class KmipEngine(object):
return response_payload
@_kmip_version_supported('1.0')
def _process_get_attributes(self, payload):
self._logger.info("Processing operation: GetAttributes")
if payload.unique_identifier:
unique_identifier = payload.unique_identifier
else:
unique_identifier = self._id_placeholder
object_type = self._get_object_type(unique_identifier)
managed_object = self._data_session.query(object_type).filter(
object_type.unique_identifier == unique_identifier
).one()
# Determine if the request should be carried out under the object's
# operation policy. If not, feign ignorance of the object.
is_allowed = self._is_allowed_by_operation_policy(
managed_object.operation_policy_name,
self._client_identity,
managed_object._owner,
managed_object._object_type,
enums.Operation.GET_ATTRIBUTES
)
if not is_allowed:
raise exceptions.ItemNotFound(
"Could not locate object: {0}".format(unique_identifier)
)
attrs = self._get_attributes_from_managed_object(
managed_object,
payload.attribute_names
)
response_payload = get_attributes.GetAttributesResponsePayload(
unique_identifier=unique_identifier,
attributes=attrs
)
return response_payload
@_kmip_version_supported('1.0')
def _process_activate(self, payload):
self._logger.info("Processing operation: Activate")
@ -1308,6 +1495,7 @@ class KmipEngine(object):
contents.Operation(enums.Operation.CREATE_KEY_PAIR),
contents.Operation(enums.Operation.REGISTER),
contents.Operation(enums.Operation.GET),
contents.Operation(enums.Operation.GET_ATTRIBUTES),
contents.Operation(enums.Operation.DESTROY),
contents.Operation(enums.Operation.QUERY)
])

View File

@ -1146,3 +1146,12 @@ class AttributePolicy(object):
# TODO (peterhamilton) Handle multivalue swap between certificate types
rule_set = self._attribute_rule_sets.get(attribute)
return rule_set.multiple_instances_permitted
def get_all_attribute_names(self):
"""
Get a list of all supported attribute names.
Returns:
list: A list of string attribute names.
"""
return self._attribute_rule_sets.keys()

View File

@ -46,7 +46,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
b'\x69\x6F\x6E\x00\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x09'
b'\x78\x2D\x50\x75\x72\x70\x6F\x73\x65\x00\x00\x00\x00\x00\x00\x00'
)
self.encoding_sans_uid = utils.BytearrayStream(
self.encoding_sans_unique_identifier = utils.BytearrayStream(
b'\x42\x00\x79\x01\x00\x00\x00\x78\x42\x00\x0A\x07\x00\x00\x00\x0C'
b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00'
b'\x42\x00\x0A\x07\x00\x00\x00\x20\x41\x70\x70\x6C\x69\x63\x61\x74'
@ -66,7 +66,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
b'\x42\x00\x79\x01\x00\x00\x00\x00'
)
self.uid = '1703250b-4d40-4de2-93a0-c494a1d4ae40'
self.unique_identifier = '1703250b-4d40-4de2-93a0-c494a1d4ae40'
self.attribute_names = [
'Object Group',
'Application Specific Information',
@ -90,41 +90,41 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
valid value.
"""
get_attributes.GetAttributesRequestPayload(
'test-uid',
'test-unique-identifier',
['test-attribute-name-1', 'test-attribute-name-2']
)
def test_uid(self):
def test_unique_identifier(self):
"""
Test that the uid attribute of a GetAttributes request payload can
be properly set and retrieved.
Test that the unique_identifier attribute of a GetAttributes request
payload can be properly set and retrieved.
"""
payload = get_attributes.GetAttributesRequestPayload()
self.assertIsNone(payload.uid)
self.assertIsNone(payload._uid)
self.assertIsNone(payload.unique_identifier)
self.assertIsNone(payload._unique_identifier)
payload.uid = 'test-uid'
payload.unique_identifier = 'test-unique-identifier'
self.assertEqual('test-uid', payload.uid)
self.assertEqual('test-unique-identifier', payload.unique_identifier)
self.assertEqual(
primitives.TextString(
value='test-uid',
value='test-unique-identifier',
tag=enums.Tags.UNIQUE_IDENTIFIER
),
payload._uid
payload._unique_identifier
)
def test_uid_with_invalid_value(self):
def test_unique_identifier_with_invalid_value(self):
"""
Test that a TypeError is raised when an invalid ID is used to set
the uid attribute of a GetAttributes request payload.
the unique_identifier attribute of a GetAttributes request payload.
"""
payload = get_attributes.GetAttributesRequestPayload()
args = (payload, 'uid', 0)
args = (payload, 'unique_identifier', 0)
self.assertRaisesRegexp(
TypeError,
"uid must be a string",
"unique identifier must be a string",
setattr,
*args
)
@ -239,18 +239,18 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
"""
payload = get_attributes.GetAttributesRequestPayload()
self.assertEqual(None, payload._uid)
self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload._attribute_names)
payload.read(self.full_encoding)
self.assertEqual(self.uid, payload.uid)
self.assertEqual(self.unique_identifier, payload.unique_identifier)
self.assertEqual(
primitives.TextString(
value=self.uid,
value=self.unique_identifier,
tag=enums.Tags.UNIQUE_IDENTIFIER
),
payload._uid
payload._unique_identifier
)
self.assertEqual(
set(self.attribute_names),
@ -265,20 +265,20 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
payload._attribute_names
)
def test_read_with_no_uid(self):
def test_read_with_no_unique_identifier(self):
"""
Test that a GetAttributes request payload with no ID can be read
from a data stream.
"""
payload = get_attributes.GetAttributesRequestPayload()
self.assertEqual(None, payload._uid)
self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload._attribute_names)
payload.read(self.encoding_sans_uid)
payload.read(self.encoding_sans_unique_identifier)
self.assertEqual(None, payload.uid)
self.assertEqual(None, payload._uid)
self.assertEqual(None, payload.unique_identifier)
self.assertEqual(None, payload._unique_identifier)
self.assertEqual(
set(self.attribute_names),
set(payload.attribute_names)
@ -299,18 +299,18 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
"""
payload = get_attributes.GetAttributesRequestPayload()
self.assertEqual(None, payload._uid)
self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload._attribute_names)
payload.read(self.encoding_sans_attribute_names)
self.assertEqual(self.uid, payload.uid)
self.assertEqual(self.unique_identifier, payload.unique_identifier)
self.assertEqual(
primitives.TextString(
value=self.uid,
value=self.unique_identifier,
tag=enums.Tags.UNIQUE_IDENTIFIER
),
payload._uid
payload._unique_identifier
)
self.assertEqual(list(), payload.attribute_names)
self.assertEqual(list(), payload._attribute_names)
@ -322,13 +322,13 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
"""
payload = get_attributes.GetAttributesRequestPayload()
self.assertEqual(None, payload._uid)
self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload._attribute_names)
payload.read(self.empty_encoding)
self.assertEqual(None, payload.uid)
self.assertEqual(None, payload._uid)
self.assertEqual(None, payload.unique_identifier)
self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload.attribute_names)
self.assertEqual(list(), payload._attribute_names)
@ -338,7 +338,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
stream.
"""
payload = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
stream = utils.BytearrayStream()
@ -347,7 +347,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
def test_write_with_no_uid(self):
def test_write_with_no_unique_identifier(self):
"""
Test that a GetAttributes request payload with no ID can be written
to a data stream.
@ -359,8 +359,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
stream = utils.BytearrayStream()
payload.write(stream)
self.assertEqual(len(self.encoding_sans_uid), len(stream))
self.assertEqual(str(self.encoding_sans_uid), str(stream))
self.assertEqual(
len(self.encoding_sans_unique_identifier),
len(stream)
)
self.assertEqual(
str(self.encoding_sans_unique_identifier),
str(stream)
)
def test_write_with_no_attribute_names(self):
"""
@ -368,7 +374,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
can be written to a data stream.
"""
payload = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
None
)
stream = utils.BytearrayStream()
@ -394,21 +400,23 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
Test that repr can be applied to a GetAttributes request payload.
"""
payload = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
uid = "uid={0}".format(payload.uid)
unique_identifier = "unique_identifier={0}".format(
payload.unique_identifier
)
attribute_names = "attribute_names={0}".format(
payload.attribute_names
)
expected = "GetAttributesRequestPayload({0}, {1})".format(
uid,
unique_identifier,
attribute_names
)
observed = repr(payload)
self.assertEqual(expected, observed)
def test_repr_with_no_uid(self):
def test_repr_with_no_unique_identifier(self):
"""
Test that repr can be applied to a GetAttributes request payload with
no ID.
@ -417,12 +425,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
None,
self.attribute_names
)
uid = "uid={0}".format(payload.uid)
unique_identifier = "unique_identifier={0}".format(
payload.unique_identifier
)
attribute_names = "attribute_names={0}".format(
payload.attribute_names
)
expected = "GetAttributesRequestPayload({0}, {1})".format(
uid,
unique_identifier,
attribute_names
)
observed = repr(payload)
@ -434,15 +444,17 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
no attribute names.
"""
payload = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
None
)
uid = "uid={0}".format(payload.uid)
unique_identifier = "unique_identifier={0}".format(
payload.unique_identifier
)
attribute_names = "attribute_names={0}".format(
payload.attribute_names
)
expected = "GetAttributesRequestPayload({0}, {1})".format(
uid,
unique_identifier,
attribute_names
)
observed = repr(payload)
@ -457,12 +469,14 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
None,
None
)
uid = "uid={0}".format(payload.uid)
unique_identifier = "unique_identifier={0}".format(
payload.unique_identifier
)
attribute_names = "attribute_names={0}".format(
payload.attribute_names
)
expected = "GetAttributesRequestPayload({0}, {1})".format(
uid,
unique_identifier,
attribute_names
)
observed = repr(payload)
@ -473,11 +487,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
Test that str can be applied to a GetAttributes request payload.
"""
payload = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
expected = str({
'uid': self.uid,
'unique_identifier': self.unique_identifier,
'attribute_names': self.attribute_names
})
observed = str(payload)
@ -493,7 +507,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
self.attribute_names
)
expected = str({
'uid': None,
'unique_identifier': None,
'attribute_names': self.attribute_names
})
observed = str(payload)
@ -505,11 +519,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
no attribute names.
"""
payload = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
None
)
expected = str({
'uid': self.uid,
'unique_identifier': self.unique_identifier,
'attribute_names': list()
})
observed = str(payload)
@ -525,7 +539,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
None
)
expected = str({
'uid': None,
'unique_identifier': None,
'attribute_names': list()
})
observed = str(payload)
@ -537,11 +551,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
GetAttributes request payloads with the same data.
"""
a = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
b = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
@ -555,25 +569,25 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
but with different attribute name orderings.
"""
a = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
self.attribute_names.reverse()
b = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_not_equal_uid(self):
def test_equal_on_not_equal_unique_identifier(self):
"""
Test that the equality operator returns False when comparing two
GetAttributes request payloads with different IDs.
"""
a = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
b = get_attributes.GetAttributesRequestPayload(
@ -590,11 +604,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
GetAttributes request payloads with different attribute names.
"""
a = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
b = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
None
)
@ -608,7 +622,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
payload.
"""
a = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
b = "invalid"
@ -622,24 +636,24 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
two GetAttributes request payloads with the same internal data.
"""
a = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
b = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_not_equal_uid(self):
def test_not_equal_on_not_equal_unique_identifier(self):
"""
Test that the inequality operator returns True when comparing two
GetAttributes request payloads with different IDs.
"""
a = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
b = get_attributes.GetAttributesRequestPayload(
@ -656,11 +670,11 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
GetAttributes request payloads with different attribute names.
"""
a = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
b = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
None
)
@ -674,7 +688,7 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
payload.
"""
a = get_attributes.GetAttributesRequestPayload(
self.uid,
self.unique_identifier,
self.attribute_names
)
b = "invalid"
@ -715,7 +729,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
b'\x42\x00\x0B\x07\x00\x00\x00\x0D\x64\x65\x6D\x6F\x6E\x73\x74\x72'
b'\x61\x74\x69\x6F\x6E\x00\x00\x00'
)
self.encoding_sans_uid = utils.BytearrayStream(
self.encoding_sans_unique_identifier = utils.BytearrayStream(
b'\x42\x00\x7C\x01\x00\x00\x01\x00\x42\x00\x08\x01\x00\x00\x00\x28'
b'\x42\x00\x0A\x07\x00\x00\x00\x0C\x4F\x62\x6A\x65\x63\x74\x20\x47'
b'\x72\x6F\x75\x70\x00\x00\x00\x00\x42\x00\x0B\x07\x00\x00\x00\x06'
@ -741,7 +755,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
b'\x61\x65\x34\x30\x00\x00\x00\x00'
)
self.uid = '1703250b-4d40-4de2-93a0-c494a1d4ae40'
self.unique_identifier = '1703250b-4d40-4de2-93a0-c494a1d4ae40'
self.attributes = [
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
@ -785,41 +799,41 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
valid value.
"""
get_attributes.GetAttributesResponsePayload(
'test-uid',
'test-unique-identifier',
[objects.Attribute(), objects.Attribute()]
)
def test_uid(self):
def test_unique_identifier(self):
"""
Test that the uid attribute of a GetAttributes response payload can
be properly set and retrieved.
Test that the unique_identifier attribute of a GetAttributes response
payload can be properly set and retrieved.
"""
payload = get_attributes.GetAttributesResponsePayload()
self.assertIsNone(payload.uid)
self.assertIsNone(payload._uid)
self.assertIsNone(payload.unique_identifier)
self.assertIsNone(payload._unique_identifier)
payload.uid = 'test-uid'
payload.unique_identifier = 'test-unique-identifier'
self.assertEqual('test-uid', payload.uid)
self.assertEqual('test-unique-identifier', payload.unique_identifier)
self.assertEqual(
primitives.TextString(
value='test-uid',
value='test-unique-identifier',
tag=enums.Tags.UNIQUE_IDENTIFIER
),
payload._uid
payload._unique_identifier
)
def test_uid_with_invalid_value(self):
def test_unique_identifier_with_invalid_value(self):
"""
Test that a TypeError is raised when an invalid ID is used to set
the uid attribute of a GetAttributes response payload.
the unique_identifier attribute of a GetAttributes response payload.
"""
payload = get_attributes.GetAttributesResponsePayload()
args = (payload, 'uid', 0)
args = (payload, 'unique_identifier', 0)
self.assertRaisesRegexp(
TypeError,
"uid must be a string",
"unique identifier must be a string",
setattr,
*args
)
@ -885,18 +899,18 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
"""
payload = get_attributes.GetAttributesResponsePayload()
self.assertEqual(None, payload._uid)
self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload._attributes)
payload.read(self.full_encoding)
self.assertEqual(self.uid, payload.uid)
self.assertEqual(self.unique_identifier, payload.unique_identifier)
self.assertEqual(
primitives.TextString(
value=self.uid,
value=self.unique_identifier,
tag=enums.Tags.UNIQUE_IDENTIFIER
),
payload._uid
payload._unique_identifier
)
self.assertEqual(
len(self.attributes),
@ -908,20 +922,20 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
payload._attributes
)
def test_read_with_no_uid(self):
def test_read_with_no_unique_identifier(self):
"""
Test that an InvalidKmipEncoding error gets raised when attempting to
read a GetAttributes response encoding with no ID data.
"""
payload = get_attributes.GetAttributesResponsePayload()
self.assertEqual(None, payload._uid)
self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload._attributes)
args = (self.encoding_sans_uid, )
args = (self.encoding_sans_unique_identifier, )
self.assertRaisesRegexp(
exceptions.InvalidKmipEncoding,
"expected GetAttributes response uid not found",
"expected GetAttributes response unique identifier not found",
payload.read,
*args
)
@ -933,18 +947,18 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
"""
payload = get_attributes.GetAttributesResponsePayload()
self.assertEqual(None, payload._uid)
self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload._attributes)
payload.read(self.encoding_sans_attributes)
self.assertEqual(self.uid, payload.uid)
self.assertEqual(self.unique_identifier, payload.unique_identifier)
self.assertEqual(
primitives.TextString(
value=self.uid,
value=self.unique_identifier,
tag=enums.Tags.UNIQUE_IDENTIFIER
),
payload._uid
payload._unique_identifier
)
self.assertEqual(list(), payload.attributes)
self.assertEqual(list(), payload._attributes)
@ -955,7 +969,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
stream.
"""
payload = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
stream = utils.BytearrayStream()
@ -964,7 +978,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
def test_write_with_no_uid(self):
def test_write_with_no_unique_identifier(self):
"""
Test that a GetAttributes request payload with no ID can be written
to a data stream.
@ -978,7 +992,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
args = (stream, )
self.assertRaisesRegexp(
exceptions.InvalidField,
"The GetAttributes response uid is required.",
"The GetAttributes response unique identifier is required.",
payload.write,
*args
)
@ -989,7 +1003,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
data can be written to a data stream.
"""
payload = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
None
)
stream = utils.BytearrayStream()
@ -1003,15 +1017,17 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
Test that repr can be applied to a GetAttributes response payload.
"""
payload = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
uid = "uid={0}".format(payload.uid)
unique_identifier = "unique_identifier={0}".format(
payload.unique_identifier
)
payload_attributes = "attributes={0}".format(
payload.attributes
)
expected = "GetAttributesResponsePayload({0}, {1})".format(
uid,
unique_identifier,
payload_attributes
)
observed = repr(payload)
@ -1022,11 +1038,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
Test that str can be applied to a GetAttributes response payload.
"""
payload = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
expected = str({
'uid': self.uid,
'unique_identifier': self.unique_identifier,
'attributes': self.attributes
})
observed = str(payload)
@ -1038,24 +1054,24 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
GetAttributes response payloads with the same data.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
b = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_not_equal_uid(self):
def test_equal_on_not_equal_unique_identifier(self):
"""
Test that the equality operator returns False when comparing two
GetAttributes response payloads with different data.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
b = get_attributes.GetAttributesResponsePayload(
@ -1072,13 +1088,13 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
GetAttributes response payloads with different data.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
reversed_attributes = copy.deepcopy(self.attributes)
reversed_attributes.reverse()
b = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
reversed_attributes
)
@ -1091,11 +1107,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
GetAttributes response payloads with different data.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
b = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
list()
)
@ -1108,11 +1124,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
GetAttributes response payloads with different data.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
None
)
b = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
@ -1126,7 +1142,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
payload.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
b = 'invalid'
@ -1140,24 +1156,24 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
two GetAttributes response payloads with the same internal data.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
b = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_not_equal_uid(self):
def test_not_equal_on_not_equal_unique_identifier(self):
"""
Test that the inequality operator returns True when comparing two
GetAttributes request payloads with different data.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
b = get_attributes.GetAttributesResponsePayload(
@ -1174,13 +1190,13 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
GetAttributes response payloads with different data.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
reversed_attributes = copy.deepcopy(self.attributes)
reversed_attributes.reverse()
b = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
reversed_attributes
)
@ -1193,11 +1209,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
GetAttributes response payloads with different data.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
b = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
list()
)
@ -1210,11 +1226,11 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
GetAttributes response payloads with different data.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
None
)
b = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
@ -1228,7 +1244,7 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
payload.
"""
a = get_attributes.GetAttributesResponsePayload(
self.uid,
self.unique_identifier,
self.attributes
)
b = "invalid"

View File

@ -43,6 +43,7 @@ from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import get_attributes
from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import register
@ -859,6 +860,7 @@ class TestKmipEngine(testtools.TestCase):
e._process_create_key_pair = mock.MagicMock()
e._process_register = mock.MagicMock()
e._process_get = mock.MagicMock()
e._process_get_attributes = mock.MagicMock()
e._process_destroy = mock.MagicMock()
e._process_query = mock.MagicMock()
e._process_discover_versions = mock.MagicMock()
@ -867,6 +869,7 @@ class TestKmipEngine(testtools.TestCase):
e._process_operation(enums.Operation.CREATE_KEY_PAIR, None)
e._process_operation(enums.Operation.REGISTER, None)
e._process_operation(enums.Operation.GET, None)
e._process_operation(enums.Operation.GET_ATTRIBUTES, None)
e._process_operation(enums.Operation.DESTROY, None)
e._process_operation(enums.Operation.QUERY, None)
e._process_operation(enums.Operation.DISCOVER_VERSIONS, None)
@ -875,6 +878,7 @@ class TestKmipEngine(testtools.TestCase):
e._process_create_key_pair.assert_called_with(None)
e._process_register.assert_called_with(None)
e._process_get.assert_called_with(None)
e._process_get_attributes.assert_called_with(None)
e._process_destroy.assert_called_with(None)
e._process_query.assert_called_with(None)
e._process_discover_versions.assert_called_with(None)
@ -945,7 +949,6 @@ class TestKmipEngine(testtools.TestCase):
e._logger.warning.assert_called_once_with(
"Could not identify object type for object: 1"
)
self.assertTrue(e._logger.exception.called)
def test_get_object_type_multiple_objects(self):
"""
@ -1352,6 +1355,345 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_get_attributes_from_managed_object(self):
"""
Test that multiple attributes can be retrieved from a given managed
object.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
symmetric_key = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b'',
masks=[enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT]
)
symmetric_key.names = ['Name 1', 'Name 2']
e._data_session.add(symmetric_key)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
result = e._get_attributes_from_managed_object(
symmetric_key,
['Unique Identifier',
'Name',
'Cryptographic Algorithm',
'Cryptographic Length',
'Cryptographic Usage Mask',
'invalid']
)
attribute_factory = factory.AttributeFactory()
self.assertEqual(6, len(result))
attribute = attribute_factory.create_attribute(
enums.AttributeType.UNIQUE_IDENTIFIER,
'1'
)
self.assertIn(attribute, result)
attribute = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
self.assertIn(attribute, result)
attribute = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
0
)
self.assertIn(attribute, result)
attribute = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT]
)
self.assertIn(attribute, result)
def test_get_attribute_from_managed_object(self):
"""
Test that an attribute can be retrieved from a given managed object.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
symmetric_key = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b'',
masks=[enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT]
)
certificate = pie_objects.X509Certificate(
b''
)
opaque_object = pie_objects.OpaqueObject(
b'',
enums.OpaqueDataType.NONE
)
e._data_session.add(symmetric_key)
e._data_session.add(certificate)
e._data_session.add(opaque_object)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
result = e._get_attribute_from_managed_object(
symmetric_key,
'Unique Identifier'
)
self.assertEqual('1', result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Name'
)
self.assertEqual(
[attributes.Name(
attributes.Name.NameValue('Symmetric Key'),
attributes.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)],
result
)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Object Type'
)
self.assertEqual(enums.ObjectType.SYMMETRIC_KEY, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Cryptographic Algorithm'
)
self.assertEqual(enums.CryptographicAlgorithm.AES, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Cryptographic Length'
)
self.assertEqual(0, result)
result = e._get_attribute_from_managed_object(
certificate,
'Cryptographic Parameters'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
certificate,
'Cryptographic Domain Parameters'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
certificate,
'Certificate Type'
)
self.assertEqual(enums.CertificateTypeEnum.X_509, result)
result = e._get_attribute_from_managed_object(
certificate,
'Certificate Length'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
certificate,
'X.509 Certificate Identifier'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
certificate,
'X.509 Certificate Subject'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
certificate,
'X.509 Certificate Issuer'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
certificate,
'Certificate Identifier'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
certificate,
'Certificate Subject'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
certificate,
'Certificate Issuer'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
certificate,
'Digital Signature Algorithm'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
opaque_object,
'Digest'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Operation Policy Name'
)
self.assertEqual('default', result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Cryptographic Usage Mask'
)
self.assertEqual(
[enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT],
result
)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Lease Time'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Usage Limits'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'State'
)
self.assertEqual(enums.State.PRE_ACTIVE, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Initial Date'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Activation Date'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Process Start Date'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Protect Stop Date'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Deactivation Date'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Destroy Date'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Compromise Occurrence Date'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Compromise Date'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Revocation Reason'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Archive Date'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Object Group'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Fresh'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Link'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Application Specific Information'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Contact Information'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'Last Change Date'
)
self.assertEqual(None, result)
result = e._get_attribute_from_managed_object(
symmetric_key,
'invalid'
)
self.assertEqual(None, result)
def test_set_attributes_on_managed_object(self):
"""
Test that multiple attributes can be set on a given managed object.
@ -3254,6 +3596,178 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_get_attributes(self):
"""
Test that a GetAttributes request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
secret = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
e._data_session.add(secret)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
payload = get_attributes.GetAttributesRequestPayload(
unique_identifier='1',
attribute_names=['Object Type', 'Cryptographic Algorithm']
)
response_payload = e._process_get_attributes(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: GetAttributes"
)
self.assertEqual(
'1',
response_payload.unique_identifier
)
self.assertEqual(
2,
len(response_payload.attributes)
)
attribute_factory = factory.AttributeFactory()
attribute = attribute_factory.create_attribute(
enums.AttributeType.OBJECT_TYPE,
enums.ObjectType.SYMMETRIC_KEY
)
self.assertIn(attribute, response_payload.attributes)
attribute = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
self.assertIn(attribute, response_payload.attributes)
def test_get_attributes_with_no_arguments(self):
"""
Test that a GetAttributes request with no arguments can be processed
correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
secret = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
e._data_session.add(secret)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._id_placeholder = '1'
payload = get_attributes.GetAttributesRequestPayload()
response_payload = e._process_get_attributes(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: GetAttributes"
)
self.assertEqual(
'1',
response_payload.unique_identifier
)
self.assertEqual(
8,
len(response_payload.attributes)
)
attribute_factory = factory.AttributeFactory()
attribute = attribute_factory.create_attribute(
enums.AttributeType.OBJECT_TYPE,
enums.ObjectType.SYMMETRIC_KEY
)
self.assertIn(attribute, response_payload.attributes)
attribute = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
self.assertIn(attribute, response_payload.attributes)
attribute = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
0
)
self.assertIn(attribute, response_payload.attributes)
attribute = attribute_factory.create_attribute(
enums.AttributeType.OPERATION_POLICY_NAME,
'default'
)
self.assertIn(attribute, response_payload.attributes)
attribute = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[]
)
self.assertIn(attribute, response_payload.attributes)
attribute = attribute_factory.create_attribute(
enums.AttributeType.STATE,
enums.State.PRE_ACTIVE
)
self.assertIn(attribute, response_payload.attributes)
attribute = attribute_factory.create_attribute(
enums.AttributeType.UNIQUE_IDENTIFIER,
'1'
)
self.assertIn(attribute, response_payload.attributes)
def test_get_attributes_not_allowed_by_policy(self):
"""
Test that an unallowed request is handled correctly by GetAttributes.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._client_identity = 'test'
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
obj_a._owner = 'admin'
e._data_session.add(obj_a)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
payload = get_attributes.GetAttributesRequestPayload(
unique_identifier=id_a
)
# Test by specifying the ID of the object whose attributes should
# be retrieved.
args = [payload]
self.assertRaisesRegex(
exceptions.ItemNotFound,
"Could not locate object: {0}".format(id_a),
e._process_get_attributes,
*args
)
def test_activate(self):
"""
Test that an Activate request can be processed correctly.
@ -3565,7 +4079,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(6, len(result.operations))
self.assertEqual(7, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
@ -3583,13 +4097,17 @@ class TestKmipEngine(testtools.TestCase):
result.operations[3].value
)
self.assertEqual(
enums.Operation.DESTROY,
enums.Operation.GET_ATTRIBUTES,
result.operations[4].value
)
self.assertEqual(
enums.Operation.QUERY,
enums.Operation.DESTROY,
result.operations[5].value
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[6].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
@ -3608,7 +4126,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsNotNone(result.operations)
self.assertEqual(7, len(result.operations))
self.assertEqual(8, len(result.operations))
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
result.operations[-1].value

View File

@ -112,3 +112,57 @@ class TestAttributePolicy(testtools.TestCase):
result = rules.is_attribute_multivalued(attribute_b)
self.assertTrue(result)
def test_get_all_attribute_names(self):
"""
Test that get_all_attribute_names returns a complete list of the
names of all spec-defined attributes.
"""
rules = policy.AttributePolicy(contents.ProtocolVersion.create(1, 0))
attribute_names = [
'Unique Identifier',
'Name',
'Object Type',
'Cryptographic Algorithm',
'Cryptographic Length',
'Cryptographic Parameters',
'Cryptographic Domain Parameters',
'Certificate Type',
'Certificate Length',
'X.509 Certificate Identifier',
'X.509 Certificate Subject',
'X.509 Certificate Issuer',
'Certificate Identifier',
'Certificate Subject',
'Certificate Issuer',
'Digital Signature Algorithm',
'Digest',
'Operation Policy Name',
'Cryptographic Usage Mask',
'Lease Time',
'Usage Limits',
'State',
'Initial Date',
'Activation Date',
'Process Start Date',
'Protect Stop Date',
'Deactivation Date',
'Destroy Date',
'Compromise Occurrence Date',
'Compromise Date',
'Revocation Reason',
'Archive Date',
'Object Group',
'Fresh',
'Link',
'Application Specific Information',
'Contact Information',
'Last Change Date',
'Custom Attribute'
]
result = rules.get_all_attribute_names()
self.assertEqual(len(attribute_names), len(result))
for attribute_name in attribute_names:
self.assertIn(attribute_name, result)