Merge pull request #298 from vbnmmnbv/locate_attributes_server
Add Name attribute filtering of locate for server
This commit is contained in:
commit
62d30be7fa
@ -1332,15 +1332,36 @@ class KmipEngine(object):
|
|||||||
|
|
||||||
@_kmip_version_supported('1.0')
|
@_kmip_version_supported('1.0')
|
||||||
def _process_locate(self, payload):
|
def _process_locate(self, payload):
|
||||||
# TODO Currently the Locate operation will just return all the
|
# TODO: Need to complete the filtering logic based on all given
|
||||||
# existing managed objects with the restriction of operation
|
# objects in payload.
|
||||||
# policy only. Need to implement other filtering logics in the
|
|
||||||
# SPEC (e.g., attribute, storage status mask and etc..)
|
|
||||||
self._logger.info("Processing operation: Locate")
|
self._logger.info("Processing operation: Locate")
|
||||||
|
|
||||||
managed_objects = self._list_objects_with_access_controls(
|
managed_objects = self._list_objects_with_access_controls(
|
||||||
enums.Operation.LOCATE)
|
enums.Operation.LOCATE)
|
||||||
|
|
||||||
|
if payload.attributes:
|
||||||
|
|
||||||
|
managed_objects_filtered = []
|
||||||
|
|
||||||
|
# Filter the objects based on given attributes.
|
||||||
|
# TODO: Currently will only filter for 'Name'.
|
||||||
|
# Needs to add other attributes.
|
||||||
|
for managed_object in managed_objects:
|
||||||
|
for attribute in payload.attributes:
|
||||||
|
attribute_name = attribute.attribute_name.value
|
||||||
|
attribute_value = attribute.attribute_value
|
||||||
|
attr = self._get_attribute_from_managed_object(
|
||||||
|
managed_object, attribute_name)
|
||||||
|
if attribute_name == 'Name':
|
||||||
|
names = attr
|
||||||
|
if attribute_value not in names:
|
||||||
|
break
|
||||||
|
# TODO: filtering on other attributes
|
||||||
|
else:
|
||||||
|
managed_objects_filtered.append(managed_object)
|
||||||
|
|
||||||
|
managed_objects = managed_objects_filtered
|
||||||
|
|
||||||
unique_identifiers = [attributes.UniqueIdentifier(
|
unique_identifiers = [attributes.UniqueIdentifier(
|
||||||
str(managed_object.unique_identifier))
|
str(managed_object.unique_identifier))
|
||||||
for managed_object in managed_objects]
|
for managed_object in managed_objects]
|
||||||
|
@ -3562,6 +3562,101 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
[uid.value for uid in response_payload.unique_identifiers]
|
[uid.value for uid in response_payload.unique_identifiers]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_locate_with_name(self):
|
||||||
|
"""
|
||||||
|
Test locate operation when 'Name' attribute is given.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
|
||||||
|
key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
b'\x00\x00\x00\x00\x00')
|
||||||
|
obj_a = pie_objects.SymmetricKey(
|
||||||
|
enums.CryptographicAlgorithm.AES, 128, key, name='name0')
|
||||||
|
obj_b = pie_objects.SymmetricKey(
|
||||||
|
enums.CryptographicAlgorithm.DES, 128, key, name='name0')
|
||||||
|
obj_c = pie_objects.SymmetricKey(
|
||||||
|
enums.CryptographicAlgorithm.AES, 128, key, name='name1')
|
||||||
|
|
||||||
|
e._data_session.add(obj_a)
|
||||||
|
e._data_session.add(obj_b)
|
||||||
|
e._data_session.add(obj_c)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
id_a = str(obj_a.unique_identifier)
|
||||||
|
id_b = str(obj_b.unique_identifier)
|
||||||
|
id_c = str(obj_c.unique_identifier)
|
||||||
|
|
||||||
|
attribute_factory = factory.AttributeFactory()
|
||||||
|
|
||||||
|
# Locate the obj with name 'name0'
|
||||||
|
attrs = [
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.NAME,
|
||||||
|
attributes.Name.create(
|
||||||
|
'name0',
|
||||||
|
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||||
|
)
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
payload = locate.LocateRequestPayload(attributes=attrs)
|
||||||
|
e._logger.reset_mock()
|
||||||
|
response_payload = e._process_locate(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_any_call(
|
||||||
|
"Processing operation: Locate"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
len(response_payload.unique_identifiers),
|
||||||
|
2
|
||||||
|
)
|
||||||
|
self.assertIn(
|
||||||
|
id_a,
|
||||||
|
[uid.value for uid in response_payload.unique_identifiers]
|
||||||
|
)
|
||||||
|
self.assertIn(
|
||||||
|
id_b,
|
||||||
|
[uid.value for uid in response_payload.unique_identifiers]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Locate the obj with name 'name1'
|
||||||
|
attrs = [
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.NAME,
|
||||||
|
attributes.Name.create(
|
||||||
|
'name1',
|
||||||
|
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||||
|
)
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
payload = locate.LocateRequestPayload(attributes=attrs)
|
||||||
|
e._logger.reset_mock()
|
||||||
|
response_payload = e._process_locate(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_any_call(
|
||||||
|
"Processing operation: Locate"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
len(response_payload.unique_identifiers),
|
||||||
|
1
|
||||||
|
)
|
||||||
|
self.assertIn(
|
||||||
|
id_c,
|
||||||
|
[uid.value for uid in response_payload.unique_identifiers]
|
||||||
|
)
|
||||||
|
|
||||||
def test_get(self):
|
def test_get(self):
|
||||||
"""
|
"""
|
||||||
Test that a Get request can be processed correctly.
|
Test that a Get request can be processed correctly.
|
||||||
|
Loading…
Reference in New Issue
Block a user