diff --git a/logscraper/logsender.py b/logscraper/logsender.py index 27adcf8..f026c00 100755 --- a/logscraper/logsender.py +++ b/logscraper/logsender.py @@ -20,7 +20,6 @@ The goal is to get content from build uuid directory and send to Opensearch """ import argparse -import collections import copy import datetime import itertools @@ -81,8 +80,6 @@ def get_arguments(): default=1500) parser.add_argument("--keep", help="Do not remove log directory after", action="store_true") - parser.add_argument("--ignore-es-status", help="Ignore Opensearch bulk", - action="store_true") parser.add_argument("--debug", help="Be more verbose", action="store_true") args = parser.parse_args() @@ -180,30 +177,6 @@ def makeFields(build_details, buildinfo): return fields -def send_bulk(es_client, request, workers, ignore_es_status, chunk_size): - """Send bulk request to Opensearch""" - try: - if ignore_es_status: - return collections.deque(helpers.parallel_bulk( - es_client, request, thread_count=workers, - chunk_size=chunk_size)) - - # NOTE: To see bulk update status, we can use: - # https://elasticsearch-py.readthedocs.io/en/7.10.0/helpers.html#example - for success, info in helpers.parallel_bulk(es_client, request, - thread_count=workers, - chunk_size=chunk_size): - if not success: - logging.error("Chunk was not send to Opensearch %s" % info) - return - # If all bulk updates are fine, return True - return True - except Exception as e: - logging.critical("Exception occured on pushing data to " - "Opensearch %s" % e) - return - - def get_timestamp(line): try: timestamp_search = re.search(r'[-0-9]{10}\s+[0-9.:]{12}', line) @@ -245,8 +218,7 @@ def logline_iter(build_file): break -def logline_iter_chunk(inner, index, es_fields, doc_type, chunk_size): - chunk = [] +def doc_iter(inner, index, es_fields, doc_type, chunk_size): for (ts, line) in inner: fields = copy.deepcopy(es_fields) fields["@timestamp"] = ts @@ -257,20 +229,23 @@ def logline_iter_chunk(inner, index, es_fields, doc_type, chunk_size): fields["message"] = message doc = {"_index": index, "_type": doc_type, "_source": fields} - chunk.append(doc) - if len(chunk) >= chunk_size: - yield chunk - chunk.clear() - yield chunk + yield doc def send_to_es(build_file, es_fields, es_client, index, workers, - ignore_es_status, chunk_size, doc_type): + chunk_size, doc_type): """Send document to the Opensearch""" logging.info("Working on %s" % build_file) - for chunk in logline_iter_chunk( - logline_iter(build_file), index, es_fields, doc_type, chunk_size): - send_bulk(es_client, chunk, workers, ignore_es_status, chunk_size) + try: + docs = doc_iter( + logline_iter(build_file), + index, es_fields, doc_type, chunk_size) + return helpers.bulk(es_client, docs) + except opensearch_exceptions.TransportError as e: + logging.critical("Can not send message to Opensearch. Error: %s" % e) + except Exception as e: + logging.critical("An error occured on sending message to " + "Opensearch %s" % e) def get_build_information(build_dir): @@ -300,8 +275,7 @@ def send(ready_directory, args, directory, index, workers): es_fields["filename"] = build_file send_status = send_to_es("%s/%s" % (build_dir, build_file), es_fields, es_client, index, workers, - args.ignore_es_status, args.chunk_size, - args.doc_type) + args.chunk_size, args.doc_type) if args.keep: logging.info("Keeping file %s" % build_dir) diff --git a/logscraper/tests/test_logsender.py b/logscraper/tests/test_logsender.py index b8721ce..cba9213 100755 --- a/logscraper/tests/test_logsender.py +++ b/logscraper/tests/test_logsender.py @@ -19,6 +19,7 @@ import io from logscraper import logsender from logscraper.tests import base +from opensearchpy.exceptions import TransportError from ruamel.yaml import YAML from unittest import mock @@ -269,7 +270,7 @@ class FakeArgs(object): def __init__(self, directory=None, host=None, port=None, username=None, password=None, index_prefix=None, index=None, doc_type=None, insecure=None, follow=None, workers=None, chunk_size=None, - keep=None, ignore_es_status=None, debug=None): + keep=None, debug=None): self.directory = directory self.host = host @@ -284,7 +285,6 @@ class FakeArgs(object): self.workers = workers self.chunk_size = chunk_size self.keep = keep - self.ignore_es_status = ignore_es_status self.debug = debug @@ -323,18 +323,41 @@ class TestSender(base.TestCase): index = 'logstash-index' workers = 1 args = logsender.get_arguments() - mock_send_to_es.return_value = True + # No metter what is ES status, it should keep dir + mock_send_to_es.return_value = None logsender.send((build_uuid, build_files), args, directory, index, workers) self.assertFalse(mock_remove_dir.called) - @mock.patch('logscraper.logsender.send_bulk') + @mock.patch('logscraper.logsender.remove_directory') + @mock.patch('logscraper.logsender.send_to_es') + @mock.patch('logscraper.logsender.get_build_information') + @mock.patch('logscraper.logsender.get_es_client') + @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( + directory="/tmp/testdir", keep=False, doc_type="_doc")) + def test_send_error_keep_dir(self, mock_args, mock_es_client, + mock_build_info, mock_send_to_es, + mock_remove_dir): + build_uuid = '38bf2cdc947643c9bb04f11f40a0f211' + build_files = ['job-result.txt'] + directory = '/tmp/testdir' + index = 'logstash-index' + workers = 1 + args = logsender.get_arguments() + mock_send_to_es.return_value = None + logsender.send((build_uuid, build_files), args, directory, index, + workers) + self.assertFalse(mock_remove_dir.called) + + @mock.patch('logscraper.logsender.doc_iter') + @mock.patch('logscraper.logsender.logline_iter') + @mock.patch('opensearchpy.helpers.bulk') @mock.patch('logscraper.logsender.open_file') @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( directory="/tmp/testdir", index="myindex", workers=1, - ignore_es_status=False, chunk_size=1000, - doc_type="zuul")) - def test_send_to_es(self, mock_args, mock_text, mock_bulk): + chunk_size=1000, doc_type="zuul")) + def test_send_to_es(self, mock_args, mock_text, mock_bulk, mock_doc_iter, + mock_logline_chunk): build_file = 'job-result.txt' es_fields = parsed_fields es_client = mock.Mock() @@ -419,53 +442,114 @@ class TestSender(base.TestCase): 'message': ' Preparing job workspace' } }] + mock_doc_iter.return_value = es_doc logsender.send_to_es(build_file, es_fields, es_client, args.index, - args.workers, args.ignore_es_status, - args.chunk_size, args.doc_type) - mock_bulk.assert_called_once_with(es_client, es_doc, 1, False, 1000) + args.workers, args.chunk_size, args.doc_type) + self.assertEqual(1, mock_bulk.call_count) - @mock.patch('collections.deque') - @mock.patch('opensearchpy.helpers.parallel_bulk') - def test_send_bulk(self, mock_parallel_bulk, mock_deque): - es_client = mock.MagicMock() - mock_parallel_bulk.return_value = [(True, "200"), (True, "200")] - request = [{'some': 'info'}, {'other': 'info'}] - workers = 1 - chunk_size = 1500 - ignore_es_status = False - bulk = logsender.send_bulk(es_client, request, workers, - ignore_es_status, chunk_size) - self.assertFalse(mock_deque.called) - self.assertTrue(mock_parallel_bulk.called) - self.assertTrue(bulk) - - @mock.patch('collections.deque') - @mock.patch('opensearchpy.helpers.parallel_bulk') - def test_send_bulk_ignore_status(self, mock_parallel_bulk, mock_deque): - es_client = mock.MagicMock() - request = [{'some': 'info'}, {'other': 'info'}] - workers = 1 - chunk_size = 1500 - ignore_es_status = True - logsender.send_bulk(es_client, request, workers, ignore_es_status, - chunk_size) - self.assertTrue(mock_deque.called) - self.assertTrue(mock_parallel_bulk.called) - - @mock.patch('collections.deque') - @mock.patch('opensearchpy.helpers.parallel_bulk') - def test_send_bulk_error(self, mock_parallel_bulk, mock_deque): + @mock.patch('logscraper.logsender.doc_iter') + @mock.patch('logscraper.logsender.logline_iter') + @mock.patch('opensearchpy.helpers.bulk') + @mock.patch('logscraper.logsender.open_file') + @mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs( + directory="/tmp/testdir", index="myindex", workers=1, + chunk_size=1000, doc_type="zuul")) + def test_send_to_es_error(self, mock_args, mock_text, mock_bulk, + mock_logline, mock_doc_iter): + build_file = 'job-result.txt' + es_fields = parsed_fields es_client = mock.Mock() - mock_parallel_bulk.return_value = [(True, "200"), (False, "500")] - request = [{'some': 'info'}, {'other': 'info'}] - workers = 1 - chunk_size = 1500 - ignore_es_status = False - bulk = logsender.send_bulk(es_client, request, workers, - ignore_es_status, chunk_size) - self.assertFalse(mock_deque.called) - self.assertTrue(mock_parallel_bulk.called) - self.assertIsNone(bulk) + args = logsender.get_arguments() + text = ["2022-02-28 09:39:09.596010 | Job console starting...", + "2022-02-28 09:39:09.610160 | Updating repositories", + "2022-02-28 09:39:09.996235 | Preparing job workspace"] + mock_text.return_value = io.StringIO("\n".join(text)) + es_doc = [{ + '_index': 'myindex', + '_type': 'zuul', + '_source': { + 'build_node': 'zuul-executor', + 'build_name': 'openstack-tox-py39', + 'build_status': 'SUCCESS', + 'project': 'openstack/neutron', + 'voting': 1, + 'build_set': '52b29e0e716a4436bd20eed47fa396ce', + 'build_queue': 'check', + 'build_ref': 'refs/changes/61/829161/3', + 'build_branch': 'master', + 'build_change': 829161, + 'build_patchset': '3', + 'build_newrev': 'UNKNOWN', + 'build_uuid': '38bf2cdc947643c9bb04f11f40a0f211', + 'node_provider': 'local', + 'log_url': + 'https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/', + 'tenant': 'openstack', + 'zuul_executor': 'ze07.opendev.org', + '@timestamp': '2022-02-28T09:39:09.596000', + 'message': ' Job console starting...' + } + }] + mock_doc_iter.return_value = es_doc + mock_bulk.side_effect = TransportError(500, "InternalServerError", { + "error": { + "root_cause": [{ + "type": "error", + "reason": "error reason" + }] + } + }) + send_status = logsender.send_to_es(build_file, es_fields, es_client, + args.index, args.workers, + args.chunk_size, args.doc_type) + self.assertIsNone(send_status) + + @mock.patch('logscraper.logsender.logline_iter') + def test_doc_iter(self, mock_logline): + text = [('2022-02-28T09:39:09.596000', + '2022-02-28 09:39:09.596010 | Job console starting...\n'), + ('2022-02-28T09:39:09.610000', + '2022-02-28 09:39:09.610160 | Updating repositories\n')] + expected_chunk = [{ + '_index': 'someindex', + '_source': { + '@timestamp': '2022-02-28T09:39:09.596000', + 'field': 'test', + 'message': ' Job console starting...' + }, + '_type': '_doc' + }, { + '_index': 'someindex', + '_source': { + '@timestamp': '2022-02-28T09:39:09.610000', + 'field': 'test', + 'message': ' Updating repositories' + }, + '_type': '_doc' + }] + chunk_text = list( + logsender.doc_iter(text, 'someindex', {'field': 'test'}, '_doc', + 1000)) + self.assertEqual(expected_chunk, chunk_text) + + def test_logline_iter(self): + text = """2022-02-28 09:39:09.596010 | Job console starting... +2022-02-28 09:39:09.610160 | Updating repositories +2022-02-28 09:39:09.996235 | Preparing job workspace""" + + expected_data = [ + ('2022-02-28T09:39:09.596000', + '2022-02-28 09:39:09.596010 | Job console starting...\n'), + ('2022-02-28T09:39:09.610000', + '2022-02-28 09:39:09.610160 | Updating repositories\n'), + ('2022-02-28T09:39:09.996000', + '2022-02-28 09:39:09.996235 | Preparing job workspace') + ] + readed_data = mock.mock_open(read_data=text) + with mock.patch('builtins.open', readed_data) as mocked_open_file: + generated_text = list(logsender.logline_iter('nofile')) + self.assertEqual(expected_data, generated_text) + self.assertTrue(mocked_open_file.called) @mock.patch('logscraper.logsender.read_yaml_file', side_effect=[_parse_get_yaml(buildinfo),