Merge pull request #242 from OpenKMIP/feat/add-get-attribute-list-to-server
Adding server support for the GetAttributeList operation
This commit is contained in:
commit
068c7f5d99
@ -16,7 +16,6 @@
|
||||
import six
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.core import exceptions
|
||||
from kmip.core import primitives
|
||||
from kmip.core import utils
|
||||
|
||||
@ -30,23 +29,44 @@ class GetAttributeListRequestPayload(primitives.Struct):
|
||||
See Section 4.13 of the KMIP 1.1 specification for more information.
|
||||
|
||||
Attributes:
|
||||
uid: The unique ID of the managed object with which the retrieved
|
||||
attributes should be associated.
|
||||
unique_identifier: The unique ID of the managed object with which the
|
||||
retrieved attributes should be associated.
|
||||
"""
|
||||
def __init__(self, uid=None):
|
||||
|
||||
def __init__(self, unique_identifier=None):
|
||||
"""
|
||||
Construct a GetAttributeList request payload.
|
||||
|
||||
Args:
|
||||
uid (string): The ID of the managed object with which the retrieved
|
||||
attributes should be associated. Optional, defaults to None.
|
||||
unique_identifier (string): The ID of the managed object with
|
||||
which the retrieved attribute names should be associated.
|
||||
Optional, defaults to None.
|
||||
"""
|
||||
super(GetAttributeListRequestPayload, self).__init__(
|
||||
enums.Tags.REQUEST_PAYLOAD)
|
||||
|
||||
self.uid = uid
|
||||
self._unique_identifier = None
|
||||
|
||||
self.validate()
|
||||
self.unique_identifier = unique_identifier
|
||||
|
||||
@property
|
||||
def unique_identifier(self):
|
||||
if self._unique_identifier:
|
||||
return self._unique_identifier.value
|
||||
else:
|
||||
return self._unique_identifier
|
||||
|
||||
@unique_identifier.setter
|
||||
def unique_identifier(self, value):
|
||||
if value is None:
|
||||
self._unique_identifier = None
|
||||
elif isinstance(value, six.string_types):
|
||||
self._unique_identifier = primitives.TextString(
|
||||
value=value,
|
||||
tag=enums.Tags.UNIQUE_IDENTIFIER
|
||||
)
|
||||
else:
|
||||
raise TypeError("unique identifier must be a string")
|
||||
|
||||
def read(self, istream):
|
||||
"""
|
||||
@ -61,14 +81,14 @@ class GetAttributeListRequestPayload(primitives.Struct):
|
||||
tstream = utils.BytearrayStream(istream.read(self.length))
|
||||
|
||||
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream):
|
||||
uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER)
|
||||
uid.read(tstream)
|
||||
self.uid = uid.value
|
||||
self._unique_identifier = primitives.TextString(
|
||||
tag=enums.Tags.UNIQUE_IDENTIFIER
|
||||
)
|
||||
self._unique_identifier.read(tstream)
|
||||
else:
|
||||
self.uid = None
|
||||
self._unique_identifier = None
|
||||
|
||||
self.is_oversized(tstream)
|
||||
self.validate()
|
||||
|
||||
def write(self, ostream):
|
||||
"""
|
||||
@ -81,36 +101,23 @@ class GetAttributeListRequestPayload(primitives.Struct):
|
||||
"""
|
||||
tstream = utils.BytearrayStream()
|
||||
|
||||
if self.uid:
|
||||
uid = primitives.TextString(
|
||||
value=self.uid, tag=enums.Tags.UNIQUE_IDENTIFIER)
|
||||
uid.write(tstream)
|
||||
if self._unique_identifier:
|
||||
self._unique_identifier.write(tstream)
|
||||
|
||||
self.length = tstream.length()
|
||||
super(GetAttributeListRequestPayload, self).write(ostream)
|
||||
ostream.write(tstream.buffer)
|
||||
|
||||
def validate(self):
|
||||
"""
|
||||
Error check the attributes of the GetAttributeList request payload.
|
||||
"""
|
||||
if self.uid is not None:
|
||||
if not isinstance(self.uid, six.string_types):
|
||||
raise TypeError(
|
||||
"uid must be a string; "
|
||||
"expected (one of): {0}, observed: {1}".format(
|
||||
six.string_types, type(self.uid)))
|
||||
|
||||
def __repr__(self):
|
||||
uid = "uid={0}".format(self.uid)
|
||||
uid = "unique_identifier={0}".format(self.unique_identifier)
|
||||
return "GetAttributeListRequestPayload({0})".format(uid)
|
||||
|
||||
def __str__(self):
|
||||
return str({'uid': self.uid})
|
||||
return str({'unique_identifier': self.unique_identifier})
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, GetAttributeListRequestPayload):
|
||||
if self.uid == other.uid:
|
||||
if self.unique_identifier == other.unique_identifier:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
@ -128,45 +135,100 @@ class GetAttributeListResponsePayload(primitives.Struct):
|
||||
"""
|
||||
A response payload for the GetAttributeList operation.
|
||||
|
||||
The payload will contain the ID of the managed object with which the
|
||||
attributes are associated. It will also contain a list of attribute names
|
||||
The payload contains the ID of the managed object with which the
|
||||
attributes are associated, along with a list of attribute names
|
||||
identifying the types of attributes associated with the aforementioned
|
||||
managed object. See Section 4.13 of the KMIP 1.1 specification for more
|
||||
information.
|
||||
managed object.
|
||||
|
||||
Attributes:
|
||||
uid: The unique ID of the managed object with which the retrieved
|
||||
attributes should be associated.
|
||||
attribute_names: The list of attribute names of the attributes
|
||||
associated with managed object identified by the uid above.
|
||||
unique_identifier: The unique ID of the managed object with which the
|
||||
retrieved attributes should be associated.
|
||||
attribute_names: A list of strings identifying the names of the
|
||||
attributes associated with the managed object.
|
||||
"""
|
||||
def __init__(self, uid=None, attribute_names=None):
|
||||
|
||||
def __init__(self, unique_identifier=None, attribute_names=None):
|
||||
"""
|
||||
Construct a GetAttributeList response payload.
|
||||
|
||||
Args:
|
||||
uid (string): The ID of the managed object with which the retrieved
|
||||
attributes should be associated. Optional, defaults to None.
|
||||
attribute_names (list): A list of strings identifying the names of
|
||||
the attributes associated with the managed object. Optional,
|
||||
unique_identifier (string): The ID of the managed object with
|
||||
which the retrieved attribute names should be associated.
|
||||
Optional, defaults to None.
|
||||
attribute_names: A list of strings identifying the names of the
|
||||
attributes associated with the managed object. Optional,
|
||||
defaults to None.
|
||||
"""
|
||||
|
||||
super(GetAttributeListResponsePayload, self).__init__(
|
||||
enums.Tags.RESPONSE_PAYLOAD)
|
||||
enums.Tags.RESPONSE_PAYLOAD
|
||||
)
|
||||
|
||||
self.uid = uid
|
||||
self._unique_identifier = None
|
||||
self._attribute_names = list()
|
||||
|
||||
if attribute_names:
|
||||
self.attribute_names = attribute_names
|
||||
self.unique_identifier = unique_identifier
|
||||
self.attribute_names = attribute_names
|
||||
|
||||
@property
|
||||
def unique_identifier(self):
|
||||
if self._unique_identifier:
|
||||
return self._unique_identifier.value
|
||||
else:
|
||||
self.attribute_names = list()
|
||||
return self._unique_identifier
|
||||
|
||||
self.validate()
|
||||
@unique_identifier.setter
|
||||
def unique_identifier(self, value):
|
||||
if value is None:
|
||||
self._unique_identifier = None
|
||||
elif isinstance(value, six.string_types):
|
||||
self._unique_identifier = primitives.TextString(
|
||||
value=value,
|
||||
tag=enums.Tags.UNIQUE_IDENTIFIER
|
||||
)
|
||||
else:
|
||||
raise TypeError("unique identifier must be a string")
|
||||
|
||||
@property
|
||||
def attribute_names(self):
|
||||
if self._attribute_names:
|
||||
names = list()
|
||||
for attribute_name in self._attribute_names:
|
||||
names.append(attribute_name.value)
|
||||
return names
|
||||
else:
|
||||
return self._attribute_names
|
||||
|
||||
@attribute_names.setter
|
||||
def attribute_names(self, value):
|
||||
if value is None:
|
||||
self._attribute_names = list()
|
||||
elif isinstance(value, list):
|
||||
names = list()
|
||||
for i in range(len(value)):
|
||||
name = value[i]
|
||||
if not isinstance(name, six.string_types):
|
||||
raise TypeError(
|
||||
"attribute_names must be a list of strings; "
|
||||
"item {0} has type {1}".format(i + 1, type(name))
|
||||
)
|
||||
if name not in names:
|
||||
names.append(name)
|
||||
self._attribute_names = list()
|
||||
for name in names:
|
||||
self._attribute_names.append(
|
||||
primitives.TextString(
|
||||
value=name,
|
||||
tag=enums.Tags.ATTRIBUTE_NAME
|
||||
)
|
||||
)
|
||||
else:
|
||||
raise TypeError("attribute_names must be a list of strings")
|
||||
|
||||
def read(self, istream):
|
||||
"""
|
||||
Read the data encoding the GetAttributeList response payload and decode
|
||||
it into its constituent parts.
|
||||
Read the data encoding the GetAttributeList response payload and
|
||||
decode it into its constituent parts.
|
||||
|
||||
Args:
|
||||
istream (stream): A data stream containing encoded object data,
|
||||
@ -176,22 +238,21 @@ class GetAttributeListResponsePayload(primitives.Struct):
|
||||
tstream = utils.BytearrayStream(istream.read(self.length))
|
||||
|
||||
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream):
|
||||
uid = primitives.TextString(tag=enums.Tags.UNIQUE_IDENTIFIER)
|
||||
uid.read(tstream)
|
||||
self.uid = uid.value
|
||||
self._unique_identifier = primitives.TextString(
|
||||
tag=enums.Tags.UNIQUE_IDENTIFIER
|
||||
)
|
||||
self._unique_identifier.read(tstream)
|
||||
else:
|
||||
raise exceptions.InvalidKmipEncoding(
|
||||
"expected uid encoding not found")
|
||||
self._unique_identifier = None
|
||||
|
||||
names = list()
|
||||
while(self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream)):
|
||||
while self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream):
|
||||
name = primitives.TextString(tag=enums.Tags.ATTRIBUTE_NAME)
|
||||
name.read(tstream)
|
||||
names.append(name.value)
|
||||
self.attribute_names = names
|
||||
names.append(name)
|
||||
self._attribute_names = names
|
||||
|
||||
self.is_oversized(tstream)
|
||||
self.validate()
|
||||
|
||||
def write(self, ostream):
|
||||
"""
|
||||
@ -204,59 +265,39 @@ class GetAttributeListResponsePayload(primitives.Struct):
|
||||
"""
|
||||
tstream = utils.BytearrayStream()
|
||||
|
||||
uid = primitives.TextString(
|
||||
value=self.uid, tag=enums.Tags.UNIQUE_IDENTIFIER)
|
||||
uid.write(tstream)
|
||||
if self._unique_identifier:
|
||||
self._unique_identifier.write(tstream)
|
||||
|
||||
for name in self.attribute_names:
|
||||
name = primitives.TextString(
|
||||
value=name, tag=enums.Tags.ATTRIBUTE_NAME)
|
||||
name.write(tstream)
|
||||
for attribute_name in self._attribute_names:
|
||||
attribute_name.write(tstream)
|
||||
|
||||
self.length = tstream.length()
|
||||
super(GetAttributeListResponsePayload, self).write(ostream)
|
||||
ostream.write(tstream.buffer)
|
||||
|
||||
def validate(self):
|
||||
"""
|
||||
Error check the attributes of the GetAttributeList response payload.
|
||||
"""
|
||||
if self.uid is not None:
|
||||
if not isinstance(self.uid, six.string_types):
|
||||
raise TypeError(
|
||||
"uid must be a string; "
|
||||
"expected (one of): {0}, observed: {1}".format(
|
||||
six.string_types, type(self.uid)))
|
||||
|
||||
if self.attribute_names:
|
||||
if not isinstance(self.attribute_names, list):
|
||||
raise TypeError("attribute names must be a list")
|
||||
for i in range(len(self.attribute_names)):
|
||||
name = self.attribute_names[i]
|
||||
if not isinstance(name, six.string_types):
|
||||
raise TypeError(
|
||||
"attribute name ({0} of {1}) must be a string".format(
|
||||
i + 1, len(self.attribute_names)))
|
||||
|
||||
def __repr__(self):
|
||||
uid = "uid={0}".format(self.uid)
|
||||
names = "attribute_names={0}".format(self.attribute_names)
|
||||
return "GetAttributeListResponsePayload({0}, {1})".format(uid, names)
|
||||
unique_identifier = "unique_identifier={0}".format(
|
||||
self.unique_identifier
|
||||
)
|
||||
attribute_names = "attribute_names={0}".format(self.attribute_names)
|
||||
return "GetAttributeListResponsePayload({0}, {1})".format(
|
||||
unique_identifier,
|
||||
attribute_names
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return str({'uid': self.uid, 'attribute_names': self.attribute_names})
|
||||
return str({
|
||||
'unique_identifier': self.unique_identifier,
|
||||
'attribute_names': self.attribute_names
|
||||
})
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, GetAttributeListResponsePayload):
|
||||
if self.uid != other.uid:
|
||||
return False
|
||||
elif ((isinstance(self.attribute_names, list) and
|
||||
isinstance(other.attribute_names, list)) and
|
||||
len(self.attribute_names) == len(other.attribute_names)):
|
||||
for name in self.attribute_names:
|
||||
if name not in other.attribute_names:
|
||||
return False
|
||||
return True
|
||||
if self.unique_identifier == other.unique_identifier:
|
||||
if set(self.attribute_names) == set(other.attribute_names):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
|
@ -591,7 +591,7 @@ class KMIPProxy(KMIP):
|
||||
names = None
|
||||
|
||||
if payload:
|
||||
uid = payload.uid
|
||||
uid = payload.unique_identifier
|
||||
names = payload.attribute_names
|
||||
|
||||
return GetAttributeListResult(
|
||||
|
@ -43,6 +43,7 @@ from kmip.core.messages.payloads import destroy
|
||||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import get
|
||||
from kmip.core.messages.payloads import get_attributes
|
||||
from kmip.core.messages.payloads import get_attribute_list
|
||||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import register
|
||||
|
||||
@ -911,6 +912,8 @@ class KmipEngine(object):
|
||||
return self._process_get(payload)
|
||||
elif operation == enums.Operation.GET_ATTRIBUTES:
|
||||
return self._process_get_attributes(payload)
|
||||
elif operation == enums.Operation.GET_ATTRIBUTE_LIST:
|
||||
return self._process_get_attribute_list(payload)
|
||||
elif operation == enums.Operation.ACTIVATE:
|
||||
return self._process_activate(payload)
|
||||
elif operation == enums.Operation.DESTROY:
|
||||
@ -1380,6 +1383,51 @@ class KmipEngine(object):
|
||||
|
||||
return response_payload
|
||||
|
||||
@_kmip_version_supported('1.0')
|
||||
def _process_get_attribute_list(self, payload):
|
||||
self._logger.info("Processing operation: GetAttributeList")
|
||||
|
||||
if payload.unique_identifier:
|
||||
unique_identifier = payload.unique_identifier
|
||||
else:
|
||||
unique_identifier = self._id_placeholder
|
||||
|
||||
object_type = self._get_object_type(unique_identifier)
|
||||
|
||||
managed_object = self._data_session.query(object_type).filter(
|
||||
object_type.unique_identifier == unique_identifier
|
||||
).one()
|
||||
|
||||
# Determine if the request should be carried out under the object's
|
||||
# operation policy. If not, feign ignorance of the object.
|
||||
is_allowed = self._is_allowed_by_operation_policy(
|
||||
managed_object.operation_policy_name,
|
||||
self._client_identity,
|
||||
managed_object._owner,
|
||||
managed_object._object_type,
|
||||
enums.Operation.GET_ATTRIBUTES
|
||||
)
|
||||
if not is_allowed:
|
||||
raise exceptions.ItemNotFound(
|
||||
"Could not locate object: {0}".format(unique_identifier)
|
||||
)
|
||||
|
||||
object_attributes = self._get_attributes_from_managed_object(
|
||||
managed_object,
|
||||
list()
|
||||
)
|
||||
attribute_names = list()
|
||||
|
||||
for object_attribute in object_attributes:
|
||||
attribute_names.append(object_attribute.attribute_name.value)
|
||||
|
||||
response_payload = get_attribute_list.GetAttributeListResponsePayload(
|
||||
unique_identifier=unique_identifier,
|
||||
attribute_names=attribute_names
|
||||
)
|
||||
|
||||
return response_payload
|
||||
|
||||
@_kmip_version_supported('1.0')
|
||||
def _process_activate(self, payload):
|
||||
self._logger.info("Processing operation: Activate")
|
||||
@ -1502,6 +1550,7 @@ class KmipEngine(object):
|
||||
contents.Operation(enums.Operation.REGISTER),
|
||||
contents.Operation(enums.Operation.GET),
|
||||
contents.Operation(enums.Operation.GET_ATTRIBUTES),
|
||||
contents.Operation(enums.Operation.GET_ATTRIBUTE_LIST),
|
||||
contents.Operation(enums.Operation.ACTIVATE),
|
||||
contents.Operation(enums.Operation.DESTROY),
|
||||
contents.Operation(enums.Operation.QUERY)
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -44,6 +44,7 @@ from kmip.core.messages.payloads import create_key_pair
|
||||
from kmip.core.messages.payloads import destroy
|
||||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import get
|
||||
from kmip.core.messages.payloads import get_attribute_list
|
||||
from kmip.core.messages.payloads import get_attributes
|
||||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import register
|
||||
@ -883,6 +884,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._process_register = mock.MagicMock()
|
||||
e._process_get = mock.MagicMock()
|
||||
e._process_get_attributes = mock.MagicMock()
|
||||
e._process_get_attribute_list = mock.MagicMock()
|
||||
e._process_activate = mock.MagicMock()
|
||||
e._process_destroy = mock.MagicMock()
|
||||
e._process_query = mock.MagicMock()
|
||||
@ -893,6 +895,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._process_operation(enums.Operation.REGISTER, None)
|
||||
e._process_operation(enums.Operation.GET, None)
|
||||
e._process_operation(enums.Operation.GET_ATTRIBUTES, None)
|
||||
e._process_operation(enums.Operation.GET_ATTRIBUTE_LIST, None)
|
||||
e._process_operation(enums.Operation.ACTIVATE, None)
|
||||
e._process_operation(enums.Operation.DESTROY, None)
|
||||
e._process_operation(enums.Operation.QUERY, None)
|
||||
@ -903,6 +906,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._process_register.assert_called_with(None)
|
||||
e._process_get.assert_called_with(None)
|
||||
e._process_get_attributes.assert_called_with(None)
|
||||
e._process_get_attribute_list.assert_called_with(None)
|
||||
e._process_activate.assert_called_with(None)
|
||||
e._process_destroy.assert_called_with(None)
|
||||
e._process_query.assert_called_with(None)
|
||||
@ -3866,6 +3870,184 @@ class TestKmipEngine(testtools.TestCase):
|
||||
*args
|
||||
)
|
||||
|
||||
def test_get_attribute_list(self):
|
||||
"""
|
||||
Test that a GetAttributeList request can be processed correctly.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
secret = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
0,
|
||||
b''
|
||||
)
|
||||
|
||||
e._data_session.add(secret)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload(
|
||||
unique_identifier='1'
|
||||
)
|
||||
|
||||
response_payload = e._process_get_attribute_list(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_any_call(
|
||||
"Processing operation: GetAttributeList"
|
||||
)
|
||||
self.assertEqual(
|
||||
'1',
|
||||
response_payload.unique_identifier
|
||||
)
|
||||
self.assertEqual(
|
||||
8,
|
||||
len(response_payload.attribute_names)
|
||||
)
|
||||
self.assertIn(
|
||||
"Object Type",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Name",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Algorithm",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Length",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Operation Policy Name",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Usage Mask",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"State",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Unique Identifier",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
|
||||
def test_get_attribute_list_with_no_arguments(self):
|
||||
"""
|
||||
Test that a GetAttributeList request with no arguments can be
|
||||
processed correctly.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
secret = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
0,
|
||||
b''
|
||||
)
|
||||
|
||||
e._data_session.add(secret)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._id_placeholder = '1'
|
||||
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload()
|
||||
|
||||
response_payload = e._process_get_attribute_list(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_any_call(
|
||||
"Processing operation: GetAttributeList"
|
||||
)
|
||||
self.assertEqual(
|
||||
'1',
|
||||
response_payload.unique_identifier
|
||||
)
|
||||
self.assertEqual(
|
||||
8,
|
||||
len(response_payload.attribute_names)
|
||||
)
|
||||
self.assertIn(
|
||||
"Object Type",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Name",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Algorithm",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Length",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Operation Policy Name",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Cryptographic Usage Mask",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"State",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
self.assertIn(
|
||||
"Unique Identifier",
|
||||
response_payload.attribute_names
|
||||
)
|
||||
|
||||
def test_get_attribute_list_not_allowed_by_policy(self):
|
||||
"""
|
||||
Test that an unallowed request is handled correctly by
|
||||
GetAttributeList.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
e._client_identity = 'test'
|
||||
|
||||
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
|
||||
obj_a._owner = 'admin'
|
||||
|
||||
e._data_session.add(obj_a)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
id_a = str(obj_a.unique_identifier)
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload(
|
||||
unique_identifier=id_a
|
||||
)
|
||||
|
||||
# Test by specifying the ID of the object whose attributes should
|
||||
# be retrieved.
|
||||
args = [payload]
|
||||
self.assertRaisesRegex(
|
||||
exceptions.ItemNotFound,
|
||||
"Could not locate object: {0}".format(id_a),
|
||||
e._process_get_attribute_list,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_activate(self):
|
||||
"""
|
||||
Test that an Activate request can be processed correctly.
|
||||
@ -4180,7 +4362,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||
self.assertIsInstance(result, query.QueryResponsePayload)
|
||||
self.assertIsNotNone(result.operations)
|
||||
self.assertEqual(8, len(result.operations))
|
||||
self.assertEqual(9, len(result.operations))
|
||||
self.assertEqual(
|
||||
enums.Operation.CREATE,
|
||||
result.operations[0].value
|
||||
@ -4202,17 +4384,21 @@ class TestKmipEngine(testtools.TestCase):
|
||||
result.operations[4].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.ACTIVATE,
|
||||
enums.Operation.GET_ATTRIBUTE_LIST,
|
||||
result.operations[5].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.DESTROY,
|
||||
enums.Operation.ACTIVATE,
|
||||
result.operations[6].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.QUERY,
|
||||
enums.Operation.DESTROY,
|
||||
result.operations[7].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.QUERY,
|
||||
result.operations[8].value
|
||||
)
|
||||
self.assertEqual(list(), result.object_types)
|
||||
self.assertIsNotNone(result.vendor_identification)
|
||||
self.assertEqual(
|
||||
@ -4231,7 +4417,7 @@ class TestKmipEngine(testtools.TestCase):
|
||||
|
||||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||
self.assertIsNotNone(result.operations)
|
||||
self.assertEqual(9, len(result.operations))
|
||||
self.assertEqual(10, len(result.operations))
|
||||
self.assertEqual(
|
||||
enums.Operation.DISCOVER_VERSIONS,
|
||||
result.operations[-1].value
|
||||
|
@ -421,7 +421,7 @@ class TestKMIPClient(TestCase):
|
||||
self.assertIsInstance(
|
||||
batch_item.request_payload,
|
||||
get_attribute_list.GetAttributeListRequestPayload)
|
||||
self.assertEqual(uid, batch_item.request_payload.uid)
|
||||
self.assertEqual(uid, batch_item.request_payload.unique_identifier)
|
||||
|
||||
def test_process_batch_items(self):
|
||||
batch_item = ResponseBatchItem(
|
||||
@ -631,7 +631,7 @@ class TestKMIPClient(TestCase):
|
||||
uid = '00000000-1111-2222-3333-444444444444'
|
||||
names = ['Cryptographic Algorithm', 'Cryptographic Length']
|
||||
payload = get_attribute_list.GetAttributeListResponsePayload(
|
||||
uid=uid, attribute_names=names)
|
||||
unique_identifier=uid, attribute_names=names)
|
||||
batch_item = ResponseBatchItem(
|
||||
operation=Operation(OperationEnum.GET_ATTRIBUTE_LIST),
|
||||
response_payload=payload)
|
||||
|
Loading…
Reference in New Issue
Block a user