Pass the doc iterator directly to the bulk helpers

This change simplifies the logsender implementation by using
the iterator based api of the opensearch client.

Change-Id: Ia335fc9e91215356f72a1f3f18ae962c4f5d2150
This commit is contained in:
Tristan Cacqueray
2022-03-04 16:23:32 +00:00
committed by Daniel Pawlik
parent 5dfb4ae1e9
commit 2fb7befb4c
2 changed files with 149 additions and 91 deletions

View File

@@ -20,7 +20,6 @@ The goal is to get content from build uuid directory and send to Opensearch
"""
import argparse
import collections
import copy
import datetime
import itertools
@@ -81,8 +80,6 @@ def get_arguments():
default=1500)
parser.add_argument("--keep", help="Do not remove log directory after",
action="store_true")
parser.add_argument("--ignore-es-status", help="Ignore Opensearch bulk",
action="store_true")
parser.add_argument("--debug", help="Be more verbose",
action="store_true")
args = parser.parse_args()
@@ -180,30 +177,6 @@ def makeFields(build_details, buildinfo):
return fields
def send_bulk(es_client, request, workers, ignore_es_status, chunk_size):
"""Send bulk request to Opensearch"""
try:
if ignore_es_status:
return collections.deque(helpers.parallel_bulk(
es_client, request, thread_count=workers,
chunk_size=chunk_size))
# NOTE: To see bulk update status, we can use:
# https://elasticsearch-py.readthedocs.io/en/7.10.0/helpers.html#example
for success, info in helpers.parallel_bulk(es_client, request,
thread_count=workers,
chunk_size=chunk_size):
if not success:
logging.error("Chunk was not send to Opensearch %s" % info)
return
# If all bulk updates are fine, return True
return True
except Exception as e:
logging.critical("Exception occured on pushing data to "
"Opensearch %s" % e)
return
def get_timestamp(line):
try:
timestamp_search = re.search(r'[-0-9]{10}\s+[0-9.:]{12}', line)
@@ -245,8 +218,7 @@ def logline_iter(build_file):
break
def logline_iter_chunk(inner, index, es_fields, doc_type, chunk_size):
chunk = []
def doc_iter(inner, index, es_fields, doc_type, chunk_size):
for (ts, line) in inner:
fields = copy.deepcopy(es_fields)
fields["@timestamp"] = ts
@@ -257,20 +229,23 @@ def logline_iter_chunk(inner, index, es_fields, doc_type, chunk_size):
fields["message"] = message
doc = {"_index": index, "_type": doc_type, "_source": fields}
chunk.append(doc)
if len(chunk) >= chunk_size:
yield chunk
chunk.clear()
yield chunk
yield doc
def send_to_es(build_file, es_fields, es_client, index, workers,
ignore_es_status, chunk_size, doc_type):
chunk_size, doc_type):
"""Send document to the Opensearch"""
logging.info("Working on %s" % build_file)
for chunk in logline_iter_chunk(
logline_iter(build_file), index, es_fields, doc_type, chunk_size):
send_bulk(es_client, chunk, workers, ignore_es_status, chunk_size)
try:
docs = doc_iter(
logline_iter(build_file),
index, es_fields, doc_type, chunk_size)
return helpers.bulk(es_client, docs)
except opensearch_exceptions.TransportError as e:
logging.critical("Can not send message to Opensearch. Error: %s" % e)
except Exception as e:
logging.critical("An error occured on sending message to "
"Opensearch %s" % e)
def get_build_information(build_dir):
@@ -300,8 +275,7 @@ def send(ready_directory, args, directory, index, workers):
es_fields["filename"] = build_file
send_status = send_to_es("%s/%s" % (build_dir, build_file),
es_fields, es_client, index, workers,
args.ignore_es_status, args.chunk_size,
args.doc_type)
args.chunk_size, args.doc_type)
if args.keep:
logging.info("Keeping file %s" % build_dir)

View File

@@ -19,6 +19,7 @@ import io
from logscraper import logsender
from logscraper.tests import base
from opensearchpy.exceptions import TransportError
from ruamel.yaml import YAML
from unittest import mock
@@ -269,7 +270,7 @@ class FakeArgs(object):
def __init__(self, directory=None, host=None, port=None, username=None,
password=None, index_prefix=None, index=None, doc_type=None,
insecure=None, follow=None, workers=None, chunk_size=None,
keep=None, ignore_es_status=None, debug=None):
keep=None, debug=None):
self.directory = directory
self.host = host
@@ -284,7 +285,6 @@ class FakeArgs(object):
self.workers = workers
self.chunk_size = chunk_size
self.keep = keep
self.ignore_es_status = ignore_es_status
self.debug = debug
@@ -323,18 +323,41 @@ class TestSender(base.TestCase):
index = 'logstash-index'
workers = 1
args = logsender.get_arguments()
mock_send_to_es.return_value = True
# No metter what is ES status, it should keep dir
mock_send_to_es.return_value = None
logsender.send((build_uuid, build_files), args, directory, index,
workers)
self.assertFalse(mock_remove_dir.called)
@mock.patch('logscraper.logsender.send_bulk')
@mock.patch('logscraper.logsender.remove_directory')
@mock.patch('logscraper.logsender.send_to_es')
@mock.patch('logscraper.logsender.get_build_information')
@mock.patch('logscraper.logsender.get_es_client')
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
directory="/tmp/testdir", keep=False, doc_type="_doc"))
def test_send_error_keep_dir(self, mock_args, mock_es_client,
mock_build_info, mock_send_to_es,
mock_remove_dir):
build_uuid = '38bf2cdc947643c9bb04f11f40a0f211'
build_files = ['job-result.txt']
directory = '/tmp/testdir'
index = 'logstash-index'
workers = 1
args = logsender.get_arguments()
mock_send_to_es.return_value = None
logsender.send((build_uuid, build_files), args, directory, index,
workers)
self.assertFalse(mock_remove_dir.called)
@mock.patch('logscraper.logsender.doc_iter')
@mock.patch('logscraper.logsender.logline_iter')
@mock.patch('opensearchpy.helpers.bulk')
@mock.patch('logscraper.logsender.open_file')
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
directory="/tmp/testdir", index="myindex", workers=1,
ignore_es_status=False, chunk_size=1000,
doc_type="zuul"))
def test_send_to_es(self, mock_args, mock_text, mock_bulk):
chunk_size=1000, doc_type="zuul"))
def test_send_to_es(self, mock_args, mock_text, mock_bulk, mock_doc_iter,
mock_logline_chunk):
build_file = 'job-result.txt'
es_fields = parsed_fields
es_client = mock.Mock()
@@ -419,53 +442,114 @@ class TestSender(base.TestCase):
'message': ' Preparing job workspace'
}
}]
mock_doc_iter.return_value = es_doc
logsender.send_to_es(build_file, es_fields, es_client, args.index,
args.workers, args.ignore_es_status,
args.chunk_size, args.doc_type)
mock_bulk.assert_called_once_with(es_client, es_doc, 1, False, 1000)
args.workers, args.chunk_size, args.doc_type)
self.assertEqual(1, mock_bulk.call_count)
@mock.patch('collections.deque')
@mock.patch('opensearchpy.helpers.parallel_bulk')
def test_send_bulk(self, mock_parallel_bulk, mock_deque):
es_client = mock.MagicMock()
mock_parallel_bulk.return_value = [(True, "200"), (True, "200")]
request = [{'some': 'info'}, {'other': 'info'}]
workers = 1
chunk_size = 1500
ignore_es_status = False
bulk = logsender.send_bulk(es_client, request, workers,
ignore_es_status, chunk_size)
self.assertFalse(mock_deque.called)
self.assertTrue(mock_parallel_bulk.called)
self.assertTrue(bulk)
@mock.patch('collections.deque')
@mock.patch('opensearchpy.helpers.parallel_bulk')
def test_send_bulk_ignore_status(self, mock_parallel_bulk, mock_deque):
es_client = mock.MagicMock()
request = [{'some': 'info'}, {'other': 'info'}]
workers = 1
chunk_size = 1500
ignore_es_status = True
logsender.send_bulk(es_client, request, workers, ignore_es_status,
chunk_size)
self.assertTrue(mock_deque.called)
self.assertTrue(mock_parallel_bulk.called)
@mock.patch('collections.deque')
@mock.patch('opensearchpy.helpers.parallel_bulk')
def test_send_bulk_error(self, mock_parallel_bulk, mock_deque):
@mock.patch('logscraper.logsender.doc_iter')
@mock.patch('logscraper.logsender.logline_iter')
@mock.patch('opensearchpy.helpers.bulk')
@mock.patch('logscraper.logsender.open_file')
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
directory="/tmp/testdir", index="myindex", workers=1,
chunk_size=1000, doc_type="zuul"))
def test_send_to_es_error(self, mock_args, mock_text, mock_bulk,
mock_logline, mock_doc_iter):
build_file = 'job-result.txt'
es_fields = parsed_fields
es_client = mock.Mock()
mock_parallel_bulk.return_value = [(True, "200"), (False, "500")]
request = [{'some': 'info'}, {'other': 'info'}]
workers = 1
chunk_size = 1500
ignore_es_status = False
bulk = logsender.send_bulk(es_client, request, workers,
ignore_es_status, chunk_size)
self.assertFalse(mock_deque.called)
self.assertTrue(mock_parallel_bulk.called)
self.assertIsNone(bulk)
args = logsender.get_arguments()
text = ["2022-02-28 09:39:09.596010 | Job console starting...",
"2022-02-28 09:39:09.610160 | Updating repositories",
"2022-02-28 09:39:09.996235 | Preparing job workspace"]
mock_text.return_value = io.StringIO("\n".join(text))
es_doc = [{
'_index': 'myindex',
'_type': 'zuul',
'_source': {
'build_node': 'zuul-executor',
'build_name': 'openstack-tox-py39',
'build_status': 'SUCCESS',
'project': 'openstack/neutron',
'voting': 1,
'build_set': '52b29e0e716a4436bd20eed47fa396ce',
'build_queue': 'check',
'build_ref': 'refs/changes/61/829161/3',
'build_branch': 'master',
'build_change': 829161,
'build_patchset': '3',
'build_newrev': 'UNKNOWN',
'build_uuid': '38bf2cdc947643c9bb04f11f40a0f211',
'node_provider': 'local',
'log_url':
'https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/',
'tenant': 'openstack',
'zuul_executor': 'ze07.opendev.org',
'@timestamp': '2022-02-28T09:39:09.596000',
'message': ' Job console starting...'
}
}]
mock_doc_iter.return_value = es_doc
mock_bulk.side_effect = TransportError(500, "InternalServerError", {
"error": {
"root_cause": [{
"type": "error",
"reason": "error reason"
}]
}
})
send_status = logsender.send_to_es(build_file, es_fields, es_client,
args.index, args.workers,
args.chunk_size, args.doc_type)
self.assertIsNone(send_status)
@mock.patch('logscraper.logsender.logline_iter')
def test_doc_iter(self, mock_logline):
text = [('2022-02-28T09:39:09.596000',
'2022-02-28 09:39:09.596010 | Job console starting...\n'),
('2022-02-28T09:39:09.610000',
'2022-02-28 09:39:09.610160 | Updating repositories\n')]
expected_chunk = [{
'_index': 'someindex',
'_source': {
'@timestamp': '2022-02-28T09:39:09.596000',
'field': 'test',
'message': ' Job console starting...'
},
'_type': '_doc'
}, {
'_index': 'someindex',
'_source': {
'@timestamp': '2022-02-28T09:39:09.610000',
'field': 'test',
'message': ' Updating repositories'
},
'_type': '_doc'
}]
chunk_text = list(
logsender.doc_iter(text, 'someindex', {'field': 'test'}, '_doc',
1000))
self.assertEqual(expected_chunk, chunk_text)
def test_logline_iter(self):
text = """2022-02-28 09:39:09.596010 | Job console starting...
2022-02-28 09:39:09.610160 | Updating repositories
2022-02-28 09:39:09.996235 | Preparing job workspace"""
expected_data = [
('2022-02-28T09:39:09.596000',
'2022-02-28 09:39:09.596010 | Job console starting...\n'),
('2022-02-28T09:39:09.610000',
'2022-02-28 09:39:09.610160 | Updating repositories\n'),
('2022-02-28T09:39:09.996000',
'2022-02-28 09:39:09.996235 | Preparing job workspace')
]
readed_data = mock.mock_open(read_data=text)
with mock.patch('builtins.open', readed_data) as mocked_open_file:
generated_text = list(logsender.logline_iter('nofile'))
self.assertEqual(expected_data, generated_text)
self.assertTrue(mocked_open_file.called)
@mock.patch('logscraper.logsender.read_yaml_file',
side_effect=[_parse_get_yaml(buildinfo),