Introduce a chunk iterator

This change avoids loading the whole logfile document in memory.

Change-Id: I39ece6a32467670f5861ff806c5151f94b0d1860
This commit is contained in:
Tristan Cacqueray
2022-03-04 16:10:01 +00:00
committed by daniel.pawlik
parent 7f8e4d1b6d
commit 5dfb4ae1e9

View File

@@ -245,12 +245,9 @@ def logline_iter(build_file):
break
def send_to_es(build_file, es_fields, es_client, index, workers,
ignore_es_status, chunk_size, doc_type):
"""Send document to the Opensearch"""
request = []
logging.info("Working on %s" % build_file)
for (ts, line) in logline_iter(build_file):
def logline_iter_chunk(inner, index, es_fields, doc_type, chunk_size):
chunk = []
for (ts, line) in inner:
fields = copy.deepcopy(es_fields)
fields["@timestamp"] = ts
@@ -260,8 +257,20 @@ def send_to_es(build_file, es_fields, es_client, index, workers,
fields["message"] = message
doc = {"_index": index, "_type": doc_type, "_source": fields}
request.append(doc)
return send_bulk(es_client, request, workers, ignore_es_status, chunk_size)
chunk.append(doc)
if len(chunk) >= chunk_size:
yield chunk
chunk.clear()
yield chunk
def send_to_es(build_file, es_fields, es_client, index, workers,
ignore_es_status, chunk_size, doc_type):
"""Send document to the Opensearch"""
logging.info("Working on %s" % build_file)
for chunk in logline_iter_chunk(
logline_iter(build_file), index, es_fields, doc_type, chunk_size):
send_bulk(es_client, chunk, workers, ignore_es_status, chunk_size)
def get_build_information(build_dir):