Add option to skip DEBUG messages
The DEBUG messages can take a lot of space which can be skipped when Opensearch cluster is out of space. Change-Id: Iced777c2435c73ab155a6d650c22e78107190810
This commit is contained in:
@@ -21,6 +21,7 @@
|
|||||||
logscraper_workers: 4
|
logscraper_workers: 4
|
||||||
logsender_workers: 10
|
logsender_workers: 10
|
||||||
logsender_wait_time: 10
|
logsender_wait_time: 10
|
||||||
|
skip_debug: true
|
||||||
roles:
|
roles:
|
||||||
- logscraper
|
- logscraper
|
||||||
- logsender
|
- logsender
|
||||||
|
|||||||
@@ -27,4 +27,5 @@ container_images:
|
|||||||
# ignore_es_status: false
|
# ignore_es_status: false
|
||||||
# logsender_wait_time: 60
|
# logsender_wait_time: 60
|
||||||
# custom_ca_crt: ""
|
# custom_ca_crt: ""
|
||||||
|
# skip_debug: false
|
||||||
tenant_builds: []
|
tenant_builds: []
|
||||||
|
|||||||
@@ -60,4 +60,7 @@
|
|||||||
{% if 'custom_ca_crt' in item %}
|
{% if 'custom_ca_crt' in item %}
|
||||||
--ca-file {{ item['custom_ca_crt'] }} \
|
--ca-file {{ item['custom_ca_crt'] }} \
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
{% if 'skip_debug' in item and item['skip_debug'] %}
|
||||||
|
--skip-debug \
|
||||||
|
{% endif %}
|
||||||
--follow
|
--follow
|
||||||
|
|||||||
@@ -81,6 +81,9 @@ def get_arguments():
|
|||||||
parser.add_argument("--chunk-size", help="The bulk chunk size",
|
parser.add_argument("--chunk-size", help="The bulk chunk size",
|
||||||
type=int,
|
type=int,
|
||||||
default=1500)
|
default=1500)
|
||||||
|
parser.add_argument("--skip-debug", help="Skip messages that contain: "
|
||||||
|
"DEBUG word",
|
||||||
|
action="store_true")
|
||||||
parser.add_argument("--keep", help="Do not remove log directory after",
|
parser.add_argument("--keep", help="Do not remove log directory after",
|
||||||
action="store_true")
|
action="store_true")
|
||||||
parser.add_argument("--debug", help="Be more verbose",
|
parser.add_argument("--debug", help="Be more verbose",
|
||||||
@@ -270,13 +273,14 @@ def json_iter(build_file):
|
|||||||
yield (ts, json.dumps(parse_file))
|
yield (ts, json.dumps(parse_file))
|
||||||
|
|
||||||
|
|
||||||
def logline_iter(build_file):
|
def logline_iter(build_file, skip_debug):
|
||||||
last_known_timestamp = None
|
last_known_timestamp = None
|
||||||
with open_file(build_file) as f:
|
with open_file(build_file) as f:
|
||||||
while True:
|
while True:
|
||||||
line = f.readline()
|
line = f.readline()
|
||||||
if last_known_timestamp is None and line.startswith(
|
if (last_known_timestamp is None and line.startswith(
|
||||||
"-- Logs begin at "):
|
"-- Logs begin at ")) or (skip_debug and
|
||||||
|
'DEBUG' in line):
|
||||||
continue
|
continue
|
||||||
if line:
|
if line:
|
||||||
ts = get_timestamp(line)
|
ts = get_timestamp(line)
|
||||||
@@ -299,13 +303,15 @@ def doc_iter(inner, index, es_fields, doc_type):
|
|||||||
message = get_message(line)
|
message = get_message(line)
|
||||||
if not message:
|
if not message:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
fields["message"] = message
|
fields["message"] = message
|
||||||
|
|
||||||
doc = {"_index": index, "_type": doc_type, "_source": fields}
|
doc = {"_index": index, "_type": doc_type, "_source": fields}
|
||||||
yield doc
|
yield doc
|
||||||
|
|
||||||
|
|
||||||
def send_to_es(build_file, es_fields, es_client, index, chunk_size, doc_type):
|
def send_to_es(build_file, es_fields, es_client, index, chunk_size, doc_type,
|
||||||
|
skip_debug):
|
||||||
"""Send document to the Opensearch"""
|
"""Send document to the Opensearch"""
|
||||||
logging.info("Working on %s" % build_file)
|
logging.info("Working on %s" % build_file)
|
||||||
|
|
||||||
@@ -314,7 +320,8 @@ def send_to_es(build_file, es_fields, es_client, index, chunk_size, doc_type):
|
|||||||
docs = doc_iter(json_iter(build_file), index, es_fields, doc_type)
|
docs = doc_iter(json_iter(build_file), index, es_fields, doc_type)
|
||||||
return helpers.bulk(es_client, docs, chunk_size=chunk_size)
|
return helpers.bulk(es_client, docs, chunk_size=chunk_size)
|
||||||
|
|
||||||
docs = doc_iter(logline_iter(build_file), index, es_fields, doc_type)
|
docs = doc_iter(logline_iter(build_file, skip_debug), index, es_fields,
|
||||||
|
doc_type)
|
||||||
return helpers.bulk(es_client, docs, chunk_size=chunk_size)
|
return helpers.bulk(es_client, docs, chunk_size=chunk_size)
|
||||||
except opensearch_exceptions.TransportError as e:
|
except opensearch_exceptions.TransportError as e:
|
||||||
logging.critical("Can not send message to Opensearch. Error: %s" % e)
|
logging.critical("Can not send message to Opensearch. Error: %s" % e)
|
||||||
@@ -355,7 +362,7 @@ def send(ready_directory, args, directory, index):
|
|||||||
fields['tags'] = file_tags
|
fields['tags'] = file_tags
|
||||||
send_status = send_to_es("%s/%s" % (build_dir, build_file),
|
send_status = send_to_es("%s/%s" % (build_dir, build_file),
|
||||||
fields, es_client, index, args.chunk_size,
|
fields, es_client, index, args.chunk_size,
|
||||||
args.doc_type)
|
args.doc_type, args.skip_debug)
|
||||||
|
|
||||||
if args.keep:
|
if args.keep:
|
||||||
logging.info("Keeping file %s" % build_dir)
|
logging.info("Keeping file %s" % build_dir)
|
||||||
|
|||||||
@@ -272,7 +272,8 @@ class FakeArgs(object):
|
|||||||
def __init__(self, config=None, directory=None, host=None, port=None,
|
def __init__(self, config=None, directory=None, host=None, port=None,
|
||||||
username=None, password=None, index_prefix=None, index=None,
|
username=None, password=None, index_prefix=None, index=None,
|
||||||
doc_type=None, insecure=None, follow=None, workers=None,
|
doc_type=None, insecure=None, follow=None, workers=None,
|
||||||
chunk_size=None, keep=None, debug=None, wait_time=None):
|
chunk_size=None, skip_debug=None, keep=None, debug=None,
|
||||||
|
wait_time=None):
|
||||||
|
|
||||||
self.config = config
|
self.config = config
|
||||||
self.directory = directory
|
self.directory = directory
|
||||||
@@ -287,6 +288,7 @@ class FakeArgs(object):
|
|||||||
self.follow = follow
|
self.follow = follow
|
||||||
self.workers = workers
|
self.workers = workers
|
||||||
self.chunk_size = chunk_size
|
self.chunk_size = chunk_size
|
||||||
|
self.skip_debug = skip_debug
|
||||||
self.keep = keep
|
self.keep = keep
|
||||||
self.debug = debug
|
self.debug = debug
|
||||||
self.wait_time = wait_time
|
self.wait_time = wait_time
|
||||||
@@ -334,7 +336,7 @@ class TestSender(base.TestCase):
|
|||||||
self.assertTrue(mock_remove_dir.called)
|
self.assertTrue(mock_remove_dir.called)
|
||||||
mock_send_to_es.assert_called_with(
|
mock_send_to_es.assert_called_with(
|
||||||
"%s/%s/job-result.txt" % (directory, build_uuid), expected_fields,
|
"%s/%s/job-result.txt" % (directory, build_uuid), expected_fields,
|
||||||
'fake_client_object', index, None, '_doc')
|
'fake_client_object', index, None, '_doc', None)
|
||||||
|
|
||||||
@mock.patch('logscraper.logsender.get_file_info')
|
@mock.patch('logscraper.logsender.get_file_info')
|
||||||
@mock.patch('logscraper.logsender.remove_directory')
|
@mock.patch('logscraper.logsender.remove_directory')
|
||||||
@@ -384,7 +386,7 @@ class TestSender(base.TestCase):
|
|||||||
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
||||||
directory="/tmp/testdir", index="myindex", workers=1,
|
directory="/tmp/testdir", index="myindex", workers=1,
|
||||||
chunk_size=1000, doc_type="zuul",
|
chunk_size=1000, doc_type="zuul",
|
||||||
config='config.yaml'))
|
config='config.yaml', skip_debug=False))
|
||||||
def test_send_to_es(self, mock_args, mock_text, mock_bulk, mock_doc_iter,
|
def test_send_to_es(self, mock_args, mock_text, mock_bulk, mock_doc_iter,
|
||||||
mock_logline_chunk, mock_file_info):
|
mock_logline_chunk, mock_file_info):
|
||||||
build_file = 'job-result.txt'
|
build_file = 'job-result.txt'
|
||||||
@@ -479,7 +481,7 @@ class TestSender(base.TestCase):
|
|||||||
}]
|
}]
|
||||||
mock_doc_iter.return_value = es_doc
|
mock_doc_iter.return_value = es_doc
|
||||||
logsender.send_to_es(build_file, es_fields, es_client, args.index,
|
logsender.send_to_es(build_file, es_fields, es_client, args.index,
|
||||||
args.chunk_size, args.doc_type)
|
args.chunk_size, args.doc_type, args.skip_debug)
|
||||||
self.assertEqual(1, mock_bulk.call_count)
|
self.assertEqual(1, mock_bulk.call_count)
|
||||||
|
|
||||||
@mock.patch('logscraper.logsender.get_file_info')
|
@mock.patch('logscraper.logsender.get_file_info')
|
||||||
@@ -490,7 +492,7 @@ class TestSender(base.TestCase):
|
|||||||
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
||||||
directory="/tmp/testdir", index="myindex", workers=1,
|
directory="/tmp/testdir", index="myindex", workers=1,
|
||||||
chunk_size=1000, doc_type="zuul",
|
chunk_size=1000, doc_type="zuul",
|
||||||
config='test.yaml'))
|
config='test.yaml', skip_debug=False))
|
||||||
def test_send_to_es_error(self, mock_args, mock_text, mock_bulk,
|
def test_send_to_es_error(self, mock_args, mock_text, mock_bulk,
|
||||||
mock_logline, mock_doc_iter, mock_file_info):
|
mock_logline, mock_doc_iter, mock_file_info):
|
||||||
build_file = 'job-result.txt'
|
build_file = 'job-result.txt'
|
||||||
@@ -538,7 +540,7 @@ class TestSender(base.TestCase):
|
|||||||
})
|
})
|
||||||
send_status = logsender.send_to_es(build_file, es_fields, es_client,
|
send_status = logsender.send_to_es(build_file, es_fields, es_client,
|
||||||
args.index, args.chunk_size,
|
args.index, args.chunk_size,
|
||||||
args.doc_type)
|
args.doc_type, args.skip_debug)
|
||||||
self.assertIsNone(send_status)
|
self.assertIsNone(send_status)
|
||||||
|
|
||||||
@mock.patch('json.load')
|
@mock.patch('json.load')
|
||||||
@@ -548,7 +550,7 @@ class TestSender(base.TestCase):
|
|||||||
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
||||||
directory="/tmp/testdir", index="myindex", workers=1,
|
directory="/tmp/testdir", index="myindex", workers=1,
|
||||||
chunk_size=1000, doc_type="zuul",
|
chunk_size=1000, doc_type="zuul",
|
||||||
config='test.yaml'))
|
config='test.yaml', skip_debug=False))
|
||||||
def test_send_to_es_json(self, mock_args, mock_text, mock_bulk,
|
def test_send_to_es_json(self, mock_args, mock_text, mock_bulk,
|
||||||
mock_file_info, mock_json_load):
|
mock_file_info, mock_json_load):
|
||||||
build_file = 'performance.json'
|
build_file = 'performance.json'
|
||||||
@@ -593,10 +595,60 @@ class TestSender(base.TestCase):
|
|||||||
}, '_type': 'zuul'
|
}, '_type': 'zuul'
|
||||||
}
|
}
|
||||||
logsender.send_to_es(build_file, es_fields, es_client, args.index,
|
logsender.send_to_es(build_file, es_fields, es_client, args.index,
|
||||||
args.chunk_size, args.doc_type)
|
args.chunk_size, args.doc_type, args.skip_debug)
|
||||||
self.assertEqual(es_doc, list(mock_bulk.call_args.args[1])[0])
|
self.assertEqual(es_doc, list(mock_bulk.call_args.args[1])[0])
|
||||||
self.assertEqual(1, mock_bulk.call_count)
|
self.assertEqual(1, mock_bulk.call_count)
|
||||||
|
|
||||||
|
@mock.patch('logscraper.logsender.get_file_info')
|
||||||
|
@mock.patch('logscraper.logsender.doc_iter')
|
||||||
|
@mock.patch('logscraper.logsender.logline_iter')
|
||||||
|
@mock.patch('opensearchpy.helpers.bulk')
|
||||||
|
@mock.patch('logscraper.logsender.open_file')
|
||||||
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
||||||
|
directory="/tmp/testdir", index="myindex", workers=1,
|
||||||
|
chunk_size=1000, doc_type="zuul",
|
||||||
|
config='test.yaml', skip_debug=True))
|
||||||
|
def test_send_to_es_skip_debug(self, mock_args, mock_text, mock_bulk,
|
||||||
|
mock_logline, mock_doc_iter,
|
||||||
|
mock_file_info):
|
||||||
|
build_file = 'job-result.txt'
|
||||||
|
es_fields = parsed_fields
|
||||||
|
es_client = mock.Mock()
|
||||||
|
args = logsender.get_arguments()
|
||||||
|
text = ["2022-02-28 09:39:09.596010 | Job console starting...",
|
||||||
|
"2022-02-28 09:39:09.610160 | DEBUG Updating repositories",
|
||||||
|
"2022-02-28 09:39:09.996235 | DEBUG Preparing job workspace"]
|
||||||
|
mock_text.return_value = io.StringIO("\n".join(text))
|
||||||
|
es_doc = [{
|
||||||
|
'_index': 'myindex',
|
||||||
|
'_source': {
|
||||||
|
'@timestamp': '2022-02-28T09:39:09.596000',
|
||||||
|
'build_branch': 'master',
|
||||||
|
'build_change': 829161,
|
||||||
|
'build_name': 'openstack-tox-py39',
|
||||||
|
'build_newrev': 'UNKNOWN',
|
||||||
|
'build_node': 'zuul-executor',
|
||||||
|
'build_patchset': '3',
|
||||||
|
'build_queue': 'check',
|
||||||
|
'build_ref': 'refs/changes/61/829161/3',
|
||||||
|
'build_set': '52b29e0e716a4436bd20eed47fa396ce',
|
||||||
|
'build_status': 'SUCCESS',
|
||||||
|
'build_uuid': '38bf2cdc947643c9bb04f11f40a0f211',
|
||||||
|
'log_url':
|
||||||
|
'https://somehost/829161/3/check/openstack-tox-py39/38bf2cd/',
|
||||||
|
'message': ' Job console starting...',
|
||||||
|
'node_provider': 'local',
|
||||||
|
'project': 'openstack/neutron',
|
||||||
|
'tenant': 'openstack',
|
||||||
|
'voting': 1,
|
||||||
|
'zuul_executor': 'ze07.opendev.org'},
|
||||||
|
'_type': 'zuul'}]
|
||||||
|
mock_doc_iter.return_value = es_doc
|
||||||
|
logsender.send_to_es(build_file, es_fields, es_client, args.index,
|
||||||
|
args.chunk_size, args.doc_type, args.skip_debug)
|
||||||
|
self.assertEqual(es_doc, list(mock_bulk.call_args.args[1]))
|
||||||
|
self.assertEqual(1, mock_bulk.call_count)
|
||||||
|
|
||||||
@mock.patch('logscraper.logsender.logline_iter')
|
@mock.patch('logscraper.logsender.logline_iter')
|
||||||
def test_doc_iter(self, mock_logline):
|
def test_doc_iter(self, mock_logline):
|
||||||
text = [(datetime.datetime(2022, 2, 28, 9, 39, 9, 596000),
|
text = [(datetime.datetime(2022, 2, 28, 9, 39, 9, 596000),
|
||||||
@@ -637,9 +689,10 @@ class TestSender(base.TestCase):
|
|||||||
(datetime.datetime(2022, 2, 28, 9, 39, 9, 996000),
|
(datetime.datetime(2022, 2, 28, 9, 39, 9, 996000),
|
||||||
'2022-02-28 09:39:09.996 | Preparing job workspace')
|
'2022-02-28 09:39:09.996 | Preparing job workspace')
|
||||||
]
|
]
|
||||||
|
skip_debug = False
|
||||||
readed_data = mock.mock_open(read_data=text)
|
readed_data = mock.mock_open(read_data=text)
|
||||||
with mock.patch('builtins.open', readed_data) as mocked_open_file:
|
with mock.patch('builtins.open', readed_data) as mocked_open_file:
|
||||||
generated_text = list(logsender.logline_iter('nofile'))
|
generated_text = list(logsender.logline_iter('nofile', skip_debug))
|
||||||
self.assertEqual(expected_data, generated_text)
|
self.assertEqual(expected_data, generated_text)
|
||||||
self.assertTrue(mocked_open_file.called)
|
self.assertTrue(mocked_open_file.called)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user