Parse timestamps in log files correctly
The log files are not in the same time format, so they needs to be parsed to iso format. Change-Id: I812b3f66f2d3a93e066746861409ca6c071e485d
This commit is contained in:
@@ -18,6 +18,7 @@
|
||||
es_port: 9200
|
||||
es_insecure: true
|
||||
es_index: logstash-logscraper
|
||||
logscraper_dir: /etc/logscraper
|
||||
tasks:
|
||||
- name: Run check services
|
||||
include_role:
|
||||
|
||||
@@ -3,6 +3,7 @@ logsender_user: logscraper
|
||||
logsender_group: logscraper
|
||||
logscraper_gid: 10210
|
||||
logscraper_uid: 10210
|
||||
logscraper_dir: /etc/logscraper
|
||||
|
||||
container_images:
|
||||
# FIXME: Create new project on Docker hub that will contain that image
|
||||
|
||||
@@ -8,8 +8,10 @@
|
||||
--uidmap 1000:{{ logscraper_uid }}:1 \
|
||||
--name logsender-{{ item.tenant }} \
|
||||
--volume {{ item.download_dir }}:{{ item.download_dir }}:z \
|
||||
--volume {{ item.logscraper_dir | default(logscraper_dir) }}:{{ logscraper_dir }}:z \
|
||||
{{ container_images['logsender'] }} \
|
||||
/usr/local/bin/logsender \
|
||||
--config {{ logscraper_dir }}/config.yaml \
|
||||
{% if 'es_host' in item and item['es_host'] %}
|
||||
--host "{{ item['es_host'] }}" \
|
||||
{% endif %}
|
||||
|
||||
@@ -21,11 +21,13 @@ The goal is to get content from build uuid directory and send to Opensearch
|
||||
|
||||
import argparse
|
||||
import copy
|
||||
import datefinder
|
||||
import datetime
|
||||
import itertools
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import parsedatetime
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
@@ -43,6 +45,8 @@ from ruamel.yaml import YAML
|
||||
def get_arguments():
|
||||
parser = argparse.ArgumentParser(description="Check log directories "
|
||||
"and push to the Opensearch service")
|
||||
parser.add_argument("--config", help="Logscraper config file",
|
||||
required=True)
|
||||
parser.add_argument("--directory",
|
||||
help="Directory, where the logs will "
|
||||
"be stored. Defaults to: /tmp/logscraper",
|
||||
@@ -185,18 +189,51 @@ def makeFields(build_inventory, buildinfo):
|
||||
return fields
|
||||
|
||||
|
||||
def get_timestamp(line):
|
||||
def alt_parse_timeformat(line):
|
||||
"""Return isoformat datetime if all parsing fails"""
|
||||
# The paredatetime lib is almost perfect converting timestamp, but
|
||||
# in few logs it increse/decrease year or hour.
|
||||
# Use it as a last hope.
|
||||
p = parsedatetime.Calendar()
|
||||
parse_time = p.parse(line)
|
||||
if parse_time:
|
||||
return datetime.datetime.fromtimestamp(
|
||||
time.mktime(parse_time[0]))
|
||||
|
||||
|
||||
def parse_text_timeformat(line):
|
||||
try:
|
||||
timestamp_search = re.search(r'[-0-9]{10}\s+[0-9.:]{12}', line)
|
||||
timestamp = (timestamp_search.group() if timestamp_search else
|
||||
datetime.datetime.utcnow().isoformat())
|
||||
# NOTE: On python 3.6, it should be:
|
||||
# datetime.datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f")
|
||||
# Ci-log-processing is using container with Python 3.8, where
|
||||
# fromisoformat attribute is available.
|
||||
return datetime.datetime.fromisoformat(timestamp).isoformat()
|
||||
except Exception as e:
|
||||
logging.critical("Exception occured on parsing timestamp %s" % e)
|
||||
# check it time is parsed like:
|
||||
# Mar 16 18:41:47 fedora-host some info
|
||||
timestamp_search = re.search(r'\w{3}\s[0-9]{2}\s[0-9.:]{8}',
|
||||
line.split("|", 1)[0])
|
||||
if timestamp_search:
|
||||
return datetime.datetime.strptime(
|
||||
timestamp_search.group(), '%b %d %H:%M:%S').replace(
|
||||
year=datetime.date.today().year)
|
||||
except TypeError:
|
||||
return alt_parse_timeformat(line)
|
||||
|
||||
|
||||
def find_time_in_line(line):
|
||||
try:
|
||||
text_timeformat = parse_text_timeformat(line)
|
||||
if text_timeformat:
|
||||
return text_timeformat
|
||||
|
||||
found_date = list(datefinder.find_dates(line))
|
||||
if found_date:
|
||||
return found_date[0]
|
||||
except TypeError:
|
||||
return alt_parse_timeformat(line)
|
||||
|
||||
|
||||
def get_timestamp(line):
|
||||
"""Parse log time format like and return in iso format"""
|
||||
time = find_time_in_line(line)
|
||||
if time and isinstance(time, datetime.datetime):
|
||||
return time.isoformat(sep='T', timespec='milliseconds').replace(
|
||||
'+00:00', '')
|
||||
|
||||
|
||||
def get_message(line):
|
||||
@@ -210,6 +247,15 @@ def open_file(path):
|
||||
return open(path, 'r')
|
||||
|
||||
|
||||
def get_file_info(config, build_file):
|
||||
yaml = YAML()
|
||||
with open_file(config) as f:
|
||||
config_files = yaml.load(f)
|
||||
for f in config_files["files"]:
|
||||
if f["name"].split('/')[-1] in build_file:
|
||||
return f
|
||||
|
||||
|
||||
def logline_iter(build_file):
|
||||
last_known_timestamp = None
|
||||
with open_file(build_file) as f:
|
||||
@@ -219,6 +265,8 @@ def logline_iter(build_file):
|
||||
ts = get_timestamp(line)
|
||||
if ts:
|
||||
last_known_timestamp = ts
|
||||
elif not last_known_timestamp and not ts:
|
||||
ts = datetime.datetime.utcnow().isoformat()
|
||||
else:
|
||||
ts = last_known_timestamp
|
||||
yield (ts, line)
|
||||
@@ -244,10 +292,10 @@ def send_to_es(build_file, es_fields, es_client, index, workers,
|
||||
chunk_size, doc_type):
|
||||
"""Send document to the Opensearch"""
|
||||
logging.info("Working on %s" % build_file)
|
||||
|
||||
try:
|
||||
docs = doc_iter(
|
||||
logline_iter(build_file),
|
||||
index, es_fields, doc_type, chunk_size)
|
||||
logline_iter(build_file), index, es_fields, doc_type, chunk_size)
|
||||
return helpers.bulk(es_client, docs)
|
||||
except opensearch_exceptions.TransportError as e:
|
||||
logging.critical("Can not send message to Opensearch. Error: %s" % e)
|
||||
@@ -283,6 +331,7 @@ def send(ready_directory, args, directory, index, workers):
|
||||
fields = copy.deepcopy(es_fields)
|
||||
fields["filename"] = build_file
|
||||
fields["log_url"] = fields["log_url"] + build_file
|
||||
fields['tags'] = get_file_info(args.config, build_file)
|
||||
send_status = send_to_es("%s/%s" % (build_dir, build_file),
|
||||
fields, es_client, index, workers,
|
||||
args.chunk_size, args.doc_type)
|
||||
|
||||
@@ -268,11 +268,12 @@ class _MockedPoolMapResult:
|
||||
|
||||
|
||||
class FakeArgs(object):
|
||||
def __init__(self, directory=None, host=None, port=None, username=None,
|
||||
password=None, index_prefix=None, index=None, doc_type=None,
|
||||
insecure=None, follow=None, workers=None, chunk_size=None,
|
||||
keep=None, debug=None):
|
||||
def __init__(self, config=None, directory=None, host=None, port=None,
|
||||
username=None, password=None, index_prefix=None, index=None,
|
||||
doc_type=None, insecure=None, follow=None, workers=None,
|
||||
chunk_size=None, keep=None, debug=None):
|
||||
|
||||
self.config = config
|
||||
self.directory = directory
|
||||
self.host = host
|
||||
self.port = port
|
||||
@@ -291,14 +292,16 @@ class FakeArgs(object):
|
||||
|
||||
class TestSender(base.TestCase):
|
||||
|
||||
@mock.patch('logscraper.logsender.get_file_info')
|
||||
@mock.patch('logscraper.logsender.remove_directory')
|
||||
@mock.patch('logscraper.logsender.send_to_es')
|
||||
@mock.patch('logscraper.logsender.get_build_information')
|
||||
@mock.patch('logscraper.logsender.get_es_client')
|
||||
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
||||
directory="/tmp/testdir", doc_type='_doc'))
|
||||
directory="/tmp/testdir", doc_type='_doc',
|
||||
config='config.yaml'))
|
||||
def test_send(self, mock_args, mock_es_client, mock_build_info,
|
||||
mock_send_to_es, mock_remove_dir):
|
||||
mock_send_to_es, mock_remove_dir, mock_info):
|
||||
build_uuid = '38bf2cdc947643c9bb04f11f40a0f211'
|
||||
build_files = ['job-result.txt']
|
||||
directory = '/tmp/testdir'
|
||||
@@ -306,6 +309,8 @@ class TestSender(base.TestCase):
|
||||
workers = 1
|
||||
mock_build_info.return_value = parsed_fields
|
||||
mock_es_client.return_value = 'fake_client_object'
|
||||
tags = ['test', 'info']
|
||||
mock_info.return_value = tags
|
||||
expected_fields = {
|
||||
'build_node': 'zuul-executor', 'build_name': 'openstack-tox-py39',
|
||||
'build_status': 'SUCCESS', 'project': 'openstack/neutron',
|
||||
@@ -319,7 +324,8 @@ class TestSender(base.TestCase):
|
||||
'log_url': 'https://somehost/829161/3/check/openstack-tox-py39/'
|
||||
'38bf2cd/job-result.txt',
|
||||
'tenant': 'openstack', 'zuul_executor': 'ze07.opendev.org',
|
||||
'filename': 'job-result.txt'
|
||||
'filename': 'job-result.txt',
|
||||
'tags': tags
|
||||
}
|
||||
args = logsender.get_arguments()
|
||||
mock_send_to_es.return_value = True
|
||||
@@ -330,6 +336,7 @@ class TestSender(base.TestCase):
|
||||
"%s/%s/job-result.txt" % (directory, build_uuid), expected_fields,
|
||||
'fake_client_object', index, workers, None, '_doc')
|
||||
|
||||
@mock.patch('logscraper.logsender.get_file_info')
|
||||
@mock.patch('logscraper.logsender.remove_directory')
|
||||
@mock.patch('logscraper.logsender.send_to_es')
|
||||
@mock.patch('logscraper.logsender.get_build_information')
|
||||
@@ -337,7 +344,7 @@ class TestSender(base.TestCase):
|
||||
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
||||
directory="/tmp/testdir", keep=True, doc_type="_doc"))
|
||||
def test_send_keep_dir(self, mock_args, mock_es_client, mock_build_info,
|
||||
mock_send_to_es, mock_remove_dir):
|
||||
mock_send_to_es, mock_remove_dir, mock_info):
|
||||
build_uuid = '38bf2cdc947643c9bb04f11f40a0f211'
|
||||
build_files = ['job-result.txt']
|
||||
directory = '/tmp/testdir'
|
||||
@@ -350,6 +357,7 @@ class TestSender(base.TestCase):
|
||||
workers)
|
||||
self.assertFalse(mock_remove_dir.called)
|
||||
|
||||
@mock.patch('logscraper.logsender.get_file_info')
|
||||
@mock.patch('logscraper.logsender.remove_directory')
|
||||
@mock.patch('logscraper.logsender.send_to_es')
|
||||
@mock.patch('logscraper.logsender.get_build_information')
|
||||
@@ -358,7 +366,7 @@ class TestSender(base.TestCase):
|
||||
directory="/tmp/testdir", keep=False, doc_type="_doc"))
|
||||
def test_send_error_keep_dir(self, mock_args, mock_es_client,
|
||||
mock_build_info, mock_send_to_es,
|
||||
mock_remove_dir):
|
||||
mock_remove_dir, mock_info):
|
||||
build_uuid = '38bf2cdc947643c9bb04f11f40a0f211'
|
||||
build_files = ['job-result.txt']
|
||||
directory = '/tmp/testdir'
|
||||
@@ -370,15 +378,17 @@ class TestSender(base.TestCase):
|
||||
workers)
|
||||
self.assertFalse(mock_remove_dir.called)
|
||||
|
||||
@mock.patch('logscraper.logsender.get_file_info')
|
||||
@mock.patch('logscraper.logsender.doc_iter')
|
||||
@mock.patch('logscraper.logsender.logline_iter')
|
||||
@mock.patch('opensearchpy.helpers.bulk')
|
||||
@mock.patch('logscraper.logsender.open_file')
|
||||
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
||||
directory="/tmp/testdir", index="myindex", workers=1,
|
||||
chunk_size=1000, doc_type="zuul"))
|
||||
chunk_size=1000, doc_type="zuul",
|
||||
config='config.yaml'))
|
||||
def test_send_to_es(self, mock_args, mock_text, mock_bulk, mock_doc_iter,
|
||||
mock_logline_chunk):
|
||||
mock_logline_chunk, mock_file_info):
|
||||
build_file = 'job-result.txt'
|
||||
es_fields = parsed_fields
|
||||
es_client = mock.Mock()
|
||||
@@ -474,15 +484,17 @@ class TestSender(base.TestCase):
|
||||
args.workers, args.chunk_size, args.doc_type)
|
||||
self.assertEqual(1, mock_bulk.call_count)
|
||||
|
||||
@mock.patch('logscraper.logsender.get_file_info')
|
||||
@mock.patch('logscraper.logsender.doc_iter')
|
||||
@mock.patch('logscraper.logsender.logline_iter')
|
||||
@mock.patch('opensearchpy.helpers.bulk')
|
||||
@mock.patch('logscraper.logsender.open_file')
|
||||
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
||||
directory="/tmp/testdir", index="myindex", workers=1,
|
||||
chunk_size=1000, doc_type="zuul"))
|
||||
chunk_size=1000, doc_type="zuul",
|
||||
config='test.yaml'))
|
||||
def test_send_to_es_error(self, mock_args, mock_text, mock_bulk,
|
||||
mock_logline, mock_doc_iter):
|
||||
mock_logline, mock_doc_iter, mock_file_info):
|
||||
build_file = 'job-result.txt'
|
||||
es_fields = parsed_fields
|
||||
es_client = mock.Mock()
|
||||
@@ -560,17 +572,17 @@ class TestSender(base.TestCase):
|
||||
self.assertEqual(expected_chunk, chunk_text)
|
||||
|
||||
def test_logline_iter(self):
|
||||
text = """2022-02-28 09:39:09.596010 | Job console starting...
|
||||
2022-02-28 09:39:09.610160 | Updating repositories
|
||||
2022-02-28 09:39:09.996235 | Preparing job workspace"""
|
||||
text = """2022-02-28 09:39:09.596 | Job console starting...
|
||||
2022-02-28 09:39:09.610 | Updating repositories
|
||||
2022-02-28 09:39:09.996 | Preparing job workspace"""
|
||||
|
||||
expected_data = [
|
||||
('2022-02-28T09:39:09.596000',
|
||||
'2022-02-28 09:39:09.596010 | Job console starting...\n'),
|
||||
('2022-02-28T09:39:09.610000',
|
||||
'2022-02-28 09:39:09.610160 | Updating repositories\n'),
|
||||
('2022-02-28T09:39:09.996000',
|
||||
'2022-02-28 09:39:09.996235 | Preparing job workspace')
|
||||
('2022-02-28T09:39:09.596',
|
||||
'2022-02-28 09:39:09.596 | Job console starting...\n'),
|
||||
('2022-02-28T09:39:09.610',
|
||||
'2022-02-28 09:39:09.610 | Updating repositories\n'),
|
||||
('2022-02-28T09:39:09.996',
|
||||
'2022-02-28 09:39:09.996 | Preparing job workspace')
|
||||
]
|
||||
readed_data = mock.mock_open(read_data=text)
|
||||
with mock.patch('builtins.open', readed_data) as mocked_open_file:
|
||||
@@ -598,10 +610,56 @@ class TestSender(base.TestCase):
|
||||
def test_get_timestamp(self):
|
||||
line_1 = "28-02-2022 09:44:58.839036 | Some message"
|
||||
line_2 = "2022-02-28 09:44:58.839036 | Other message"
|
||||
self.assertEqual(None, logsender.get_timestamp(line_1))
|
||||
self.assertEqual("2022-02-28T09:44:58.839000",
|
||||
self.assertEqual("2022-02-28T09:44:58.839",
|
||||
logsender.get_timestamp(line_1))
|
||||
self.assertEqual("2022-02-28T09:44:58.839",
|
||||
logsender.get_timestamp(line_2))
|
||||
|
||||
@mock.patch('logscraper.logsender.get_file_info')
|
||||
def test_get_timestamp_isoformat(self, mock_info):
|
||||
# logstash.log
|
||||
line_1 = "2022-03-21T08:39:18.220547Z | Last metadata expiration"
|
||||
self.assertEqual("2022-03-21T08:39:18.220",
|
||||
logsender.get_timestamp(line_1))
|
||||
|
||||
def test_get_timestamp_textformat(self):
|
||||
# syslog.log
|
||||
line_1 = "-- Logs begin at Mon 2022-03-21 09:22:16 UTC"
|
||||
line_2 = "Mar 21 09:33:23 fedora-rax-dfw-0028920567 sudo[2786]: zuul "
|
||||
# FIXME: the line_1 should be skipped
|
||||
self.assertEqual('2022-03-21T09:22:16.000',
|
||||
logsender.get_timestamp(line_1))
|
||||
self.assertEqual("2022-03-21T09:33:23.000",
|
||||
logsender.get_timestamp(line_2))
|
||||
|
||||
@mock.patch('ruamel.yaml.YAML.load')
|
||||
@mock.patch('logscraper.logsender.open_file')
|
||||
def test_get_file_info(self, mock_open_file, mock_yaml):
|
||||
config = {'files': [{
|
||||
'name': 'job-output.txt',
|
||||
'timeformat': r'[-0-9]{10}\s+[0-9.:]{12}',
|
||||
'tags': ['console', 'console.html']
|
||||
}, {'name': 'logs/undercloud/var/log/extra/logstash.txt',
|
||||
'timeformat': 'isoformat',
|
||||
'tags': ['console', 'postpci']}]}
|
||||
expected_output_1 = {
|
||||
'name': 'logs/undercloud/var/log/extra/logstash.txt',
|
||||
'timeformat': 'isoformat',
|
||||
'tags': ['console', 'postpci']
|
||||
}
|
||||
expected_output_2 = {
|
||||
'name': 'job-output.txt',
|
||||
'tags': ['console', 'console.html'],
|
||||
'timeformat': '[-0-9]{10}\\s+[0-9.:]{12}'
|
||||
}
|
||||
mock_yaml.return_value = config
|
||||
self.assertEqual(expected_output_1, logsender.get_file_info(
|
||||
config, './9e7bbfb1a4614bc4be06776658fa888f/logstash.txt'))
|
||||
self.assertEqual(expected_output_2, logsender.get_file_info(
|
||||
config, './9e7bbfb1a4614bc4be06776658fa888f/job-output.txt'))
|
||||
self.assertIsNone(logsender.get_file_info(
|
||||
config, './9e7bbfb1a4614bc4be06776658fa888f/somejob.txt'))
|
||||
|
||||
@mock.patch('logscraper.logsender.get_es_client')
|
||||
@mock.patch('argparse.ArgumentParser.parse_args', return_value=FakeArgs(
|
||||
index_prefix="my-index-", workers=2))
|
||||
|
||||
@@ -5,3 +5,5 @@ PyYAML<6.1 # MIT
|
||||
tenacity
|
||||
opensearch-py<=1.0.0 # Apache-2.0
|
||||
ruamel.yaml
|
||||
datefinder # MIT
|
||||
parsedatetime # Apache-2.0
|
||||
|
||||
Reference in New Issue
Block a user