From 6c292222f96192eaeaa93849da7fb2aa37245deb Mon Sep 17 00:00:00 2001 From: Tristan Cacqueray Date: Tue, 15 Mar 2022 15:15:59 +0000 Subject: [PATCH] Add BuildCache checkpoint to avoid missing long running build This change mitigates an issue with the builds api that does not returns build by end_time, as previously thought. Change-Id: I28357cbe888649b89705511942db70fb26ba0dde --- README.rst | 2 +- doc/source/logscraper.rst | 10 +- logscraper/logscraper.py | 137 +++++++++++++++++++++++----- logscraper/tests/test_logscraper.py | 102 +++++++++++++++++---- logscraper/tests/test_logsender.py | 2 +- 5 files changed, 208 insertions(+), 45 deletions(-) diff --git a/README.rst b/README.rst index 546d395..e3c855f 100644 --- a/README.rst +++ b/README.rst @@ -98,7 +98,7 @@ To download logs: logscraper \ --zuul-api-url https://zuul.opendev.org/api/tenant/openstack \ - --checkpoint-file /tmp/results-checkpoint.txt \ + --checkpoint-file /tmp/results-checkpoint \ --worker 8 \ --max-skipped 100 \ --download \ diff --git a/doc/source/logscraper.rst b/doc/source/logscraper.rst index c96e6ab..3b32261 100644 --- a/doc/source/logscraper.rst +++ b/doc/source/logscraper.rst @@ -55,19 +55,19 @@ Example: .. code-block:: - logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /tmp/results-checkpoint.txt --follow + logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /tmp/results-checkpoint --follow * one shot on getting logs from `zuul` tenant: .. code-block:: - logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --checkpoint-file /tmp/zuul-result-timestamp.txt + logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --checkpoint-file /tmp/zuul-result-timestamp * periodically scrape logs from tenants: `openstack`, `zuul` and `local` .. code-block:: - logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --zuul-api-url https://zuul.opendev.org/api/tenant/local --checkpoint-file /tmp/someresults.txt --follow + logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --zuul-api-url https://zuul.opendev.org/api/tenant/local --checkpoint-file /tmp/someresults --follow * scrape logs from two defined job names: `tripleo-ci-centos-8-containers-multinode` and `openstack-tox-linters` for tenants: `openstack` and `local`: @@ -99,11 +99,11 @@ to the container, for example: .. code-block:: - docker run -v $(pwd):/checkpoint-dir:z -d logscraper logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint.txt --follow + docker run -v $(pwd):/checkpoint-dir:z -d logscraper logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint --follow In this example, logscraper will download log files to the /mnt/logscraper directory: .. code-block:: - docker run -v $(pwd):/checkpoint-dir:z -v /mnt/logscraper:/mnt/logscraper:z -d logscraper logscraper --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint.txt --directory /mnt/logscraper --download --follow + docker run -v $(pwd):/checkpoint-dir:z -v /mnt/logscraper:/mnt/logscraper:z -d logscraper logscraper --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint --directory /mnt/logscraper --download --follow diff --git a/logscraper/logscraper.py b/logscraper/logscraper.py index 0308701..c6a52f9 100755 --- a/logscraper/logscraper.py +++ b/logscraper/logscraper.py @@ -18,10 +18,48 @@ The goal is to push recent zuul builds into log gearman processor. [ CLI ] -> [ Config ] -> [ ZuulFetcher ] -> [ LogPublisher ] + + +# Zuul builds results are not sorted by end time. Here is a problematic +scenario: + +Neutron01 build starts at 00:00 +Many smolXX build starts and stops at 01:00 +Neutron01 build stops at 02:00 + + +When at 01:55 we query the /builds: + +- smol42 ends at 01:54 +- smol41 ends at 01:50 +- smol40 ends at 01:49 +- ... + + +When at 02:05 we query the /builds: + +- smol42 ends at 01:54 # already in build cache, skip +- smol41 ends at 01:50 # already in build cache, skip +- smol40 ends at 01:49 # already in build cache, skip +- ... +- neutron01 ends at 02:00 # not in build cache, get_last_job_results yield + +Question: when to stop the builds query? + +We could check that all the _id value got processed, but that can be tricky +when long running build are interleaved with short one. For example, the +scrapper could keep track of the oldest _id and ensure it got them all. + +But Instead, we'll always grab the last 1000 builds and process new builds. +This is not ideal, because we might miss builds if more than 1000 builds +happens between two query. +But that will have todo until the zuul builds api can return builds sorted by +end_time. """ import argparse +import datetime import gear import itertools import json @@ -30,6 +68,7 @@ import multiprocessing import os import requests import socket +import sqlite3 import sys import time import yaml @@ -190,7 +229,7 @@ def get_arguments(): parser.add_argument("--max-skipped", help="How many job results should be " "checked until last uuid written in checkpoint file " "is founded", - default=500) + default=1000) parser.add_argument("--debug", help="Print more information", action="store_true") parser.add_argument("--download", help="Download logs and do not send " @@ -208,7 +247,6 @@ def get_arguments(): ############################################################################### class Config: def __init__(self, args, zuul_api_url, job_name=None): - self.checkpoint = None url_path = zuul_api_url.split("/") if url_path[-3] != "api" and url_path[-2] != "tenant": print( @@ -223,18 +261,74 @@ class Config: if job_name: self.filename = "%s-%s" % (self.filename, job_name) - try: - with open(self.filename) as f: - self.checkpoint = f.readline() - except Exception: - logging.exception("Can't load the checkpoint. Creating file") + self.build_cache = BuildCache(self.filename) - def save(self, job_uuid): + def save(self): try: - with open(self.filename, 'w') as f: - f.write(job_uuid) + self.build_cache.save() except Exception as e: - raise("Can not write status to the checkpoint file %s" % e) + logging.critical("Can not write status to the build_cache " + "file %s" % e) + + +class BuildCache: + def __init__(self, filepath=None): + self.builds = dict() + + if not filepath: + logging.critical("No cache file provided. Can not continue") + sys.exit(1) + + self.create_db(filepath) + self.create_table() + + # clean builds that are older than 1 day + self.clean() + + rows = self.fetch_data() + if rows: + for r in rows: + uid, date = r + self.builds[uid] = date + + def create_db(self, filepath): + try: + self.connection = sqlite3.connect(filepath) + self.cursor = self.connection.cursor() + except Exception as e: + logging.critical("Can not create cache DB! Error %s" % e) + + def create_table(self): + try: + self.cursor.execute("CREATE TABLE IF NOT EXISTS logscraper (" + "uid INTEGER, timestamp INTEGER)") + except sqlite3.OperationalError: + logging.debug("The logscraper table already exists") + + def fetch_data(self): + try: + return self.cursor.execute( + "SELECT uid, timestamp FROM logscraper").fetchall() + except Exception as e: + logging.exception("Can't get data from cache file! Error %s" % e) + + def add(self, uid): + self.builds[uid] = int(datetime.datetime.now().timestamp()) + + def clean(self): + # Remove old builds + yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + self.cursor.execute("DELETE FROM logscraper WHERE timestamp < %s" % + yesterday.timestamp()) + self.connection.commit() + + def save(self): + self.cursor.executemany('INSERT INTO logscraper VALUES (?,?)', + list(self.builds.items())) + self.connection.commit() + + def contains(self, uid): + return uid in self.builds ############################################################################### @@ -396,17 +490,18 @@ def filter_available_jobs(zuul_api_url, job_names, insecure): return filtered_jobs -def get_last_job_results(zuul_url, insecure, max_skipped, last_uuid, +def get_last_job_results(zuul_url, insecure, max_builds, build_cache, job_name): """Yield builds until we find the last uuid.""" count = 0 for build in get_builds(zuul_url, insecure, job_name): - if count > int(max_skipped): - break - if build["uuid"] == last_uuid: - break - yield build count += 1 + if count > int(max_builds): + break + if build_cache.contains(build["_id"]): + continue + build_cache.add(build["_id"]) + yield build ############################################################################### @@ -551,14 +646,14 @@ def check_connection(logstash_url): def run_scraping(args, zuul_api_url, job_name=None): """Get latest job results and push them into log processing service. - On the end, write newest uuid into checkpoint file, so in the future - script will not push log duplication. + On the end, write build_cache file, so in the future + script will not push duplicate build. """ config = Config(args, zuul_api_url, job_name) builds = [] for build in get_last_job_results(zuul_api_url, args.insecure, - args.max_skipped, config.checkpoint, + args.max_skipped, config.build_cache, job_name): logging.debug("Working on build %s" % build['uuid']) # add missing informations @@ -578,7 +673,7 @@ def run_scraping(args, zuul_api_url, job_name=None): try: pool.map(run_build, builds) finally: - config.save(builds[0]['uuid']) + config.save() def run(args): diff --git a/logscraper/tests/test_logscraper.py b/logscraper/tests/test_logscraper.py index 336649c..d90a584 100644 --- a/logscraper/tests/test_logscraper.py +++ b/logscraper/tests/test_logscraper.py @@ -14,7 +14,9 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import json +import tempfile from logscraper import logscraper from logscraper.tests import base @@ -236,12 +238,19 @@ class TestScraper(base.TestCase): args.logstash_url) @mock.patch('logscraper.logscraper.get_builds', - return_value=iter([{'uuid': '1234'}])) - def test_get_last_job_results(self, mock_get_builds): + return_value=iter([{'_id': '1234'}])) + @mock.patch('argparse.ArgumentParser.parse_args') + def test_get_last_job_results(self, mock_args, mock_get_builds): + mock_args.return_value = FakeArgs( + zuul_api_url='http://somehost.com/api/tenant/sometenant', + gearman_server='localhost', + checkpoint_file='/tmp/testfile') + args = logscraper.get_arguments() + some_config = logscraper.Config(args, args.zuul_api_url) job_result = logscraper.get_last_job_results( 'http://somehost.com/api/tenant/tenant1', False, '1234', - 'someuuid', None) - self.assertEqual([{'uuid': '1234'}], list(job_result)) + some_config.build_cache, None) + self.assertEqual([{'_id': '1234'}], list(job_result)) self.assertEqual(1, mock_get_builds.call_count) @mock.patch('logscraper.logscraper.get_builds', @@ -397,7 +406,7 @@ class TestScraper(base.TestCase): class TestConfig(base.TestCase): @mock.patch('sys.exit') - def test_save(self, mock_sys): + def test_config_object(self, mock_sys): # Assume that url is wrong so it raise IndexError with mock.patch('argparse.ArgumentParser.parse_args') as mock_args: mock_args.return_value = FakeArgs( @@ -415,19 +424,20 @@ class TestConfig(base.TestCase): logscraper.Config(args, args.zuul_api_url) mock_sys.assert_called() + @mock.patch('logscraper.logscraper.BuildCache.save') + @mock.patch('logscraper.logscraper.BuildCache.clean') + @mock.patch('argparse.ArgumentParser.parse_args') + def test_save(self, mock_args, mock_clean, mock_save): # correct url without job name - with mock.patch('argparse.ArgumentParser.parse_args') as mock_args: - mock_args.return_value = FakeArgs( - zuul_api_url='http://somehost.com/api/tenant/sometenant', - gearman_server='localhost', - checkpoint_file='/tmp/testfile') - args = logscraper.get_arguments() - with mock.patch('builtins.open', - new_callable=mock.mock_open() - ) as mock_file: - some_config = logscraper.Config(args, args.zuul_api_url) - some_config.save('123412312341234') - mock_file.assert_called_with('/tmp/testfile-sometenant', 'w') + mock_args.return_value = FakeArgs( + zuul_api_url='http://somehost.com/api/tenant/sometenant', + gearman_server='localhost', + checkpoint_file='/tmp/testfile') + args = logscraper.get_arguments() + some_config = logscraper.Config(args, args.zuul_api_url) + some_config.save() + mock_clean.assert_called_once() + mock_save.assert_called_once() class TestLogMatcher(base.TestCase): @@ -526,3 +536,61 @@ class TestLogMatcher(base.TestCase): expected_gear_job, json.loads(mock_gear_job.call_args.args[1].decode('utf-8')) ) + + +class TestBuildCache(base.TestCase): + + @mock.patch('sqlite3.connect', return_value=mock.MagicMock()) + def test_create_db(self, mock_connect): + filename = '/tmp/somefile' + logscraper.BuildCache(filename) + mock_connect.assert_called_with(filename) + mock_connect.return_value.cursor.assert_called_once() + + @mock.patch('sqlite3.connect') + def test_create_table(self, mock_connect): + tmp_dir = tempfile.mkdtemp() + filename = '%s/testfile' % tmp_dir + logscraper.BuildCache(filename) + mock_execute = mock_connect.return_value.cursor.return_value.execute + mock_execute.assert_called() + self.assertEqual('CREATE TABLE IF NOT EXISTS logscraper (uid INTEGER, ' + 'timestamp INTEGER)', + mock_execute.call_args_list[0].args[0]) + + @mock.patch('sqlite3.connect') + def test_fetch_data(self, mock_connect): + tmp_dir = tempfile.mkdtemp() + filename = '%s/testfile' % tmp_dir + logscraper.BuildCache(filename) + mock_execute = mock_connect.return_value.cursor.return_value.execute + mock_execute.assert_called() + self.assertEqual('SELECT uid, timestamp FROM logscraper', + mock_execute.call_args_list[2].args[0]) + + def test_clean(self): + # add old data + tmp_dir = tempfile.mkdtemp() + filename = '%s/testfile' % tmp_dir + cache = logscraper.BuildCache(filename) + current_build = {'ffeeddccbbaa': datetime.datetime.now().timestamp()} + cache.builds['aabbccddeeff'] = 1647131633 + cache.builds.update(current_build) + cache.save() + # check cleanup + cache = logscraper.BuildCache(filename) + cache.clean() + self.assertEqual(current_build, cache.builds) + + @mock.patch('sqlite3.connect') + def test_save(self, mock_connect): + tmp_dir = tempfile.mkdtemp() + filename = '%s/testfile' % tmp_dir + cache = logscraper.BuildCache(filename) + cache.builds = {'ffeeddccbbaa': datetime.datetime.now().timestamp()} + cache.save() + mock_many = mock_connect.return_value.cursor.return_value.executemany + mock_many.assert_called() + expected_call = ('INSERT INTO logscraper VALUES (?,?)', + list(cache.builds.items())) + self.assertEqual(expected_call, mock_many.call_args_list[0].args) diff --git a/logscraper/tests/test_logsender.py b/logscraper/tests/test_logsender.py index b8721ce..b40cf55 100755 --- a/logscraper/tests/test_logsender.py +++ b/logscraper/tests/test_logsender.py @@ -27,7 +27,7 @@ buildinfo = """ _id: 17428524 branch: master build_args: - checkpoint_file: /tmp/results-checkpoint.txt + checkpoint_file: /tmp/results-checkpoint debug: false directory: /tmp/logscraper download: true