diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 331a7f5..9787d86 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1332,15 +1332,36 @@ class KmipEngine(object): @_kmip_version_supported('1.0') def _process_locate(self, payload): - # TODO Currently the Locate operation will just return all the - # existing managed objects with the restriction of operation - # policy only. Need to implement other filtering logics in the - # SPEC (e.g., attribute, storage status mask and etc..) + # TODO: Need to complete the filtering logic based on all given + # objects in payload. self._logger.info("Processing operation: Locate") managed_objects = self._list_objects_with_access_controls( enums.Operation.LOCATE) + if payload.attributes: + + managed_objects_filtered = [] + + # Filter the objects based on given attributes. + # TODO: Currently will only filter for 'Name'. + # Needs to add other attributes. + for managed_object in managed_objects: + for attribute in payload.attributes: + attribute_name = attribute.attribute_name.value + attribute_value = attribute.attribute_value + attr = self._get_attribute_from_managed_object( + managed_object, attribute_name) + if attribute_name == 'Name': + names = attr + if attribute_value not in names: + break + # TODO: filtering on other attributes + else: + managed_objects_filtered.append(managed_object) + + managed_objects = managed_objects_filtered + unique_identifiers = [attributes.UniqueIdentifier( str(managed_object.unique_identifier)) for managed_object in managed_objects] diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 38181e2..092c901 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -3562,6 +3562,101 @@ class TestKmipEngine(testtools.TestCase): [uid.value for uid in response_payload.unique_identifiers] ) + def test_locate_with_name(self): + """ + Test locate operation when 'Name' attribute is given. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00') + obj_a = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, 128, key, name='name0') + obj_b = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.DES, 128, key, name='name0') + obj_c = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, 128, key, name='name1') + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.add(obj_c) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + id_b = str(obj_b.unique_identifier) + id_c = str(obj_c.unique_identifier) + + attribute_factory = factory.AttributeFactory() + + # Locate the obj with name 'name0' + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'name0', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ), + ] + + payload = locate.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: Locate" + ) + + self.assertEqual( + len(response_payload.unique_identifiers), + 2 + ) + self.assertIn( + id_a, + [uid.value for uid in response_payload.unique_identifiers] + ) + self.assertIn( + id_b, + [uid.value for uid in response_payload.unique_identifiers] + ) + + # Locate the obj with name 'name1' + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'name1', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ), + ] + + payload = locate.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: Locate" + ) + + self.assertEqual( + len(response_payload.unique_identifiers), + 1 + ) + self.assertIn( + id_c, + [uid.value for uid in response_payload.unique_identifiers] + ) + def test_get(self): """ Test that a Get request can be processed correctly.