Merge "Add BuildCache checkpoint to avoid missing long running build"
This commit is contained in:
@@ -98,7 +98,7 @@ To download logs:
|
||||
|
||||
logscraper \
|
||||
--zuul-api-url https://zuul.opendev.org/api/tenant/openstack \
|
||||
--checkpoint-file /tmp/results-checkpoint.txt \
|
||||
--checkpoint-file /tmp/results-checkpoint \
|
||||
--worker 8 \
|
||||
--max-skipped 100 \
|
||||
--download \
|
||||
|
||||
@@ -55,19 +55,19 @@ Example:
|
||||
|
||||
.. code-block::
|
||||
|
||||
logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /tmp/results-checkpoint.txt --follow
|
||||
logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /tmp/results-checkpoint --follow
|
||||
|
||||
* one shot on getting logs from `zuul` tenant:
|
||||
|
||||
.. code-block::
|
||||
|
||||
logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --checkpoint-file /tmp/zuul-result-timestamp.txt
|
||||
logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --checkpoint-file /tmp/zuul-result-timestamp
|
||||
|
||||
* periodically scrape logs from tenants: `openstack`, `zuul` and `local`
|
||||
|
||||
.. code-block::
|
||||
|
||||
logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --zuul-api-url https://zuul.opendev.org/api/tenant/local --checkpoint-file /tmp/someresults.txt --follow
|
||||
logscraper --gearman-server localhost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --zuul-api-url https://zuul.opendev.org/api/tenant/zuul --zuul-api-url https://zuul.opendev.org/api/tenant/local --checkpoint-file /tmp/someresults --follow
|
||||
|
||||
* scrape logs from two defined job names: `tripleo-ci-centos-8-containers-multinode` and `openstack-tox-linters` for tenants: `openstack` and `local`:
|
||||
|
||||
@@ -99,11 +99,11 @@ to the container, for example:
|
||||
|
||||
.. code-block::
|
||||
|
||||
docker run -v $(pwd):/checkpoint-dir:z -d logscraper logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint.txt --follow
|
||||
docker run -v $(pwd):/checkpoint-dir:z -d logscraper logscraper --gearman-server somehost --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint --follow
|
||||
|
||||
|
||||
In this example, logscraper will download log files to the /mnt/logscraper directory:
|
||||
|
||||
.. code-block::
|
||||
|
||||
docker run -v $(pwd):/checkpoint-dir:z -v /mnt/logscraper:/mnt/logscraper:z -d logscraper logscraper --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint.txt --directory /mnt/logscraper --download --follow
|
||||
docker run -v $(pwd):/checkpoint-dir:z -v /mnt/logscraper:/mnt/logscraper:z -d logscraper logscraper --zuul-api-url https://zuul.opendev.org/api/tenant/openstack --checkpoint-file /checkpoint-dir/checkpoint --directory /mnt/logscraper --download --follow
|
||||
|
||||
@@ -18,10 +18,48 @@
|
||||
The goal is to push recent zuul builds into log gearman processor.
|
||||
|
||||
[ CLI ] -> [ Config ] -> [ ZuulFetcher ] -> [ LogPublisher ]
|
||||
|
||||
|
||||
# Zuul builds results are not sorted by end time. Here is a problematic
|
||||
scenario:
|
||||
|
||||
Neutron01 build starts at 00:00
|
||||
Many smolXX build starts and stops at 01:00
|
||||
Neutron01 build stops at 02:00
|
||||
|
||||
|
||||
When at 01:55 we query the /builds:
|
||||
|
||||
- smol42 ends at 01:54
|
||||
- smol41 ends at 01:50
|
||||
- smol40 ends at 01:49
|
||||
- ...
|
||||
|
||||
|
||||
When at 02:05 we query the /builds:
|
||||
|
||||
- smol42 ends at 01:54 # already in build cache, skip
|
||||
- smol41 ends at 01:50 # already in build cache, skip
|
||||
- smol40 ends at 01:49 # already in build cache, skip
|
||||
- ...
|
||||
- neutron01 ends at 02:00 # not in build cache, get_last_job_results yield
|
||||
|
||||
Question: when to stop the builds query?
|
||||
|
||||
We could check that all the _id value got processed, but that can be tricky
|
||||
when long running build are interleaved with short one. For example, the
|
||||
scrapper could keep track of the oldest _id and ensure it got them all.
|
||||
|
||||
But Instead, we'll always grab the last 1000 builds and process new builds.
|
||||
This is not ideal, because we might miss builds if more than 1000 builds
|
||||
happens between two query.
|
||||
But that will have todo until the zuul builds api can return builds sorted by
|
||||
end_time.
|
||||
"""
|
||||
|
||||
|
||||
import argparse
|
||||
import datetime
|
||||
import gear
|
||||
import itertools
|
||||
import json
|
||||
@@ -30,6 +68,7 @@ import multiprocessing
|
||||
import os
|
||||
import requests
|
||||
import socket
|
||||
import sqlite3
|
||||
import sys
|
||||
import time
|
||||
import yaml
|
||||
@@ -190,7 +229,7 @@ def get_arguments():
|
||||
parser.add_argument("--max-skipped", help="How many job results should be "
|
||||
"checked until last uuid written in checkpoint file "
|
||||
"is founded",
|
||||
default=500)
|
||||
default=1000)
|
||||
parser.add_argument("--debug", help="Print more information",
|
||||
action="store_true")
|
||||
parser.add_argument("--download", help="Download logs and do not send "
|
||||
@@ -208,7 +247,6 @@ def get_arguments():
|
||||
###############################################################################
|
||||
class Config:
|
||||
def __init__(self, args, zuul_api_url, job_name=None):
|
||||
self.checkpoint = None
|
||||
url_path = zuul_api_url.split("/")
|
||||
if url_path[-3] != "api" and url_path[-2] != "tenant":
|
||||
print(
|
||||
@@ -223,18 +261,74 @@ class Config:
|
||||
if job_name:
|
||||
self.filename = "%s-%s" % (self.filename, job_name)
|
||||
|
||||
try:
|
||||
with open(self.filename) as f:
|
||||
self.checkpoint = f.readline()
|
||||
except Exception:
|
||||
logging.exception("Can't load the checkpoint. Creating file")
|
||||
self.build_cache = BuildCache(self.filename)
|
||||
|
||||
def save(self, job_uuid):
|
||||
def save(self):
|
||||
try:
|
||||
with open(self.filename, 'w') as f:
|
||||
f.write(job_uuid)
|
||||
self.build_cache.save()
|
||||
except Exception as e:
|
||||
raise("Can not write status to the checkpoint file %s" % e)
|
||||
logging.critical("Can not write status to the build_cache "
|
||||
"file %s" % e)
|
||||
|
||||
|
||||
class BuildCache:
|
||||
def __init__(self, filepath=None):
|
||||
self.builds = dict()
|
||||
|
||||
if not filepath:
|
||||
logging.critical("No cache file provided. Can not continue")
|
||||
sys.exit(1)
|
||||
|
||||
self.create_db(filepath)
|
||||
self.create_table()
|
||||
|
||||
# clean builds that are older than 1 day
|
||||
self.clean()
|
||||
|
||||
rows = self.fetch_data()
|
||||
if rows:
|
||||
for r in rows:
|
||||
uid, date = r
|
||||
self.builds[uid] = date
|
||||
|
||||
def create_db(self, filepath):
|
||||
try:
|
||||
self.connection = sqlite3.connect(filepath)
|
||||
self.cursor = self.connection.cursor()
|
||||
except Exception as e:
|
||||
logging.critical("Can not create cache DB! Error %s" % e)
|
||||
|
||||
def create_table(self):
|
||||
try:
|
||||
self.cursor.execute("CREATE TABLE IF NOT EXISTS logscraper ("
|
||||
"uid INTEGER, timestamp INTEGER)")
|
||||
except sqlite3.OperationalError:
|
||||
logging.debug("The logscraper table already exists")
|
||||
|
||||
def fetch_data(self):
|
||||
try:
|
||||
return self.cursor.execute(
|
||||
"SELECT uid, timestamp FROM logscraper").fetchall()
|
||||
except Exception as e:
|
||||
logging.exception("Can't get data from cache file! Error %s" % e)
|
||||
|
||||
def add(self, uid):
|
||||
self.builds[uid] = int(datetime.datetime.now().timestamp())
|
||||
|
||||
def clean(self):
|
||||
# Remove old builds
|
||||
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
self.cursor.execute("DELETE FROM logscraper WHERE timestamp < %s" %
|
||||
yesterday.timestamp())
|
||||
self.connection.commit()
|
||||
|
||||
def save(self):
|
||||
self.cursor.executemany('INSERT INTO logscraper VALUES (?,?)',
|
||||
list(self.builds.items()))
|
||||
self.connection.commit()
|
||||
|
||||
def contains(self, uid):
|
||||
return uid in self.builds
|
||||
|
||||
|
||||
###############################################################################
|
||||
@@ -396,17 +490,18 @@ def filter_available_jobs(zuul_api_url, job_names, insecure):
|
||||
return filtered_jobs
|
||||
|
||||
|
||||
def get_last_job_results(zuul_url, insecure, max_skipped, last_uuid,
|
||||
def get_last_job_results(zuul_url, insecure, max_builds, build_cache,
|
||||
job_name):
|
||||
"""Yield builds until we find the last uuid."""
|
||||
count = 0
|
||||
for build in get_builds(zuul_url, insecure, job_name):
|
||||
if count > int(max_skipped):
|
||||
break
|
||||
if build["uuid"] == last_uuid:
|
||||
break
|
||||
yield build
|
||||
count += 1
|
||||
if count > int(max_builds):
|
||||
break
|
||||
if build_cache.contains(build["_id"]):
|
||||
continue
|
||||
build_cache.add(build["_id"])
|
||||
yield build
|
||||
|
||||
|
||||
###############################################################################
|
||||
@@ -551,14 +646,14 @@ def check_connection(logstash_url):
|
||||
def run_scraping(args, zuul_api_url, job_name=None):
|
||||
"""Get latest job results and push them into log processing service.
|
||||
|
||||
On the end, write newest uuid into checkpoint file, so in the future
|
||||
script will not push log duplication.
|
||||
On the end, write build_cache file, so in the future
|
||||
script will not push duplicate build.
|
||||
"""
|
||||
config = Config(args, zuul_api_url, job_name)
|
||||
|
||||
builds = []
|
||||
for build in get_last_job_results(zuul_api_url, args.insecure,
|
||||
args.max_skipped, config.checkpoint,
|
||||
args.max_skipped, config.build_cache,
|
||||
job_name):
|
||||
logging.debug("Working on build %s" % build['uuid'])
|
||||
# add missing informations
|
||||
@@ -578,7 +673,7 @@ def run_scraping(args, zuul_api_url, job_name=None):
|
||||
try:
|
||||
pool.map(run_build, builds)
|
||||
finally:
|
||||
config.save(builds[0]['uuid'])
|
||||
config.save()
|
||||
|
||||
|
||||
def run(args):
|
||||
|
||||
@@ -14,7 +14,9 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import tempfile
|
||||
|
||||
from logscraper import logscraper
|
||||
from logscraper.tests import base
|
||||
@@ -236,12 +238,19 @@ class TestScraper(base.TestCase):
|
||||
args.logstash_url)
|
||||
|
||||
@mock.patch('logscraper.logscraper.get_builds',
|
||||
return_value=iter([{'uuid': '1234'}]))
|
||||
def test_get_last_job_results(self, mock_get_builds):
|
||||
return_value=iter([{'_id': '1234'}]))
|
||||
@mock.patch('argparse.ArgumentParser.parse_args')
|
||||
def test_get_last_job_results(self, mock_args, mock_get_builds):
|
||||
mock_args.return_value = FakeArgs(
|
||||
zuul_api_url='http://somehost.com/api/tenant/sometenant',
|
||||
gearman_server='localhost',
|
||||
checkpoint_file='/tmp/testfile')
|
||||
args = logscraper.get_arguments()
|
||||
some_config = logscraper.Config(args, args.zuul_api_url)
|
||||
job_result = logscraper.get_last_job_results(
|
||||
'http://somehost.com/api/tenant/tenant1', False, '1234',
|
||||
'someuuid', None)
|
||||
self.assertEqual([{'uuid': '1234'}], list(job_result))
|
||||
some_config.build_cache, None)
|
||||
self.assertEqual([{'_id': '1234'}], list(job_result))
|
||||
self.assertEqual(1, mock_get_builds.call_count)
|
||||
|
||||
@mock.patch('logscraper.logscraper.get_builds',
|
||||
@@ -397,7 +406,7 @@ class TestScraper(base.TestCase):
|
||||
|
||||
class TestConfig(base.TestCase):
|
||||
@mock.patch('sys.exit')
|
||||
def test_save(self, mock_sys):
|
||||
def test_config_object(self, mock_sys):
|
||||
# Assume that url is wrong so it raise IndexError
|
||||
with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
|
||||
mock_args.return_value = FakeArgs(
|
||||
@@ -415,19 +424,20 @@ class TestConfig(base.TestCase):
|
||||
logscraper.Config(args, args.zuul_api_url)
|
||||
mock_sys.assert_called()
|
||||
|
||||
@mock.patch('logscraper.logscraper.BuildCache.save')
|
||||
@mock.patch('logscraper.logscraper.BuildCache.clean')
|
||||
@mock.patch('argparse.ArgumentParser.parse_args')
|
||||
def test_save(self, mock_args, mock_clean, mock_save):
|
||||
# correct url without job name
|
||||
with mock.patch('argparse.ArgumentParser.parse_args') as mock_args:
|
||||
mock_args.return_value = FakeArgs(
|
||||
zuul_api_url='http://somehost.com/api/tenant/sometenant',
|
||||
gearman_server='localhost',
|
||||
checkpoint_file='/tmp/testfile')
|
||||
args = logscraper.get_arguments()
|
||||
with mock.patch('builtins.open',
|
||||
new_callable=mock.mock_open()
|
||||
) as mock_file:
|
||||
some_config = logscraper.Config(args, args.zuul_api_url)
|
||||
some_config.save('123412312341234')
|
||||
mock_file.assert_called_with('/tmp/testfile-sometenant', 'w')
|
||||
mock_args.return_value = FakeArgs(
|
||||
zuul_api_url='http://somehost.com/api/tenant/sometenant',
|
||||
gearman_server='localhost',
|
||||
checkpoint_file='/tmp/testfile')
|
||||
args = logscraper.get_arguments()
|
||||
some_config = logscraper.Config(args, args.zuul_api_url)
|
||||
some_config.save()
|
||||
mock_clean.assert_called_once()
|
||||
mock_save.assert_called_once()
|
||||
|
||||
|
||||
class TestLogMatcher(base.TestCase):
|
||||
@@ -526,3 +536,61 @@ class TestLogMatcher(base.TestCase):
|
||||
expected_gear_job,
|
||||
json.loads(mock_gear_job.call_args.args[1].decode('utf-8'))
|
||||
)
|
||||
|
||||
|
||||
class TestBuildCache(base.TestCase):
|
||||
|
||||
@mock.patch('sqlite3.connect', return_value=mock.MagicMock())
|
||||
def test_create_db(self, mock_connect):
|
||||
filename = '/tmp/somefile'
|
||||
logscraper.BuildCache(filename)
|
||||
mock_connect.assert_called_with(filename)
|
||||
mock_connect.return_value.cursor.assert_called_once()
|
||||
|
||||
@mock.patch('sqlite3.connect')
|
||||
def test_create_table(self, mock_connect):
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
filename = '%s/testfile' % tmp_dir
|
||||
logscraper.BuildCache(filename)
|
||||
mock_execute = mock_connect.return_value.cursor.return_value.execute
|
||||
mock_execute.assert_called()
|
||||
self.assertEqual('CREATE TABLE IF NOT EXISTS logscraper (uid INTEGER, '
|
||||
'timestamp INTEGER)',
|
||||
mock_execute.call_args_list[0].args[0])
|
||||
|
||||
@mock.patch('sqlite3.connect')
|
||||
def test_fetch_data(self, mock_connect):
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
filename = '%s/testfile' % tmp_dir
|
||||
logscraper.BuildCache(filename)
|
||||
mock_execute = mock_connect.return_value.cursor.return_value.execute
|
||||
mock_execute.assert_called()
|
||||
self.assertEqual('SELECT uid, timestamp FROM logscraper',
|
||||
mock_execute.call_args_list[2].args[0])
|
||||
|
||||
def test_clean(self):
|
||||
# add old data
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
filename = '%s/testfile' % tmp_dir
|
||||
cache = logscraper.BuildCache(filename)
|
||||
current_build = {'ffeeddccbbaa': datetime.datetime.now().timestamp()}
|
||||
cache.builds['aabbccddeeff'] = 1647131633
|
||||
cache.builds.update(current_build)
|
||||
cache.save()
|
||||
# check cleanup
|
||||
cache = logscraper.BuildCache(filename)
|
||||
cache.clean()
|
||||
self.assertEqual(current_build, cache.builds)
|
||||
|
||||
@mock.patch('sqlite3.connect')
|
||||
def test_save(self, mock_connect):
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
filename = '%s/testfile' % tmp_dir
|
||||
cache = logscraper.BuildCache(filename)
|
||||
cache.builds = {'ffeeddccbbaa': datetime.datetime.now().timestamp()}
|
||||
cache.save()
|
||||
mock_many = mock_connect.return_value.cursor.return_value.executemany
|
||||
mock_many.assert_called()
|
||||
expected_call = ('INSERT INTO logscraper VALUES (?,?)',
|
||||
list(cache.builds.items()))
|
||||
self.assertEqual(expected_call, mock_many.call_args_list[0].args)
|
||||
|
||||
@@ -28,7 +28,7 @@ buildinfo = """
|
||||
_id: 17428524
|
||||
branch: master
|
||||
build_args:
|
||||
checkpoint_file: /tmp/results-checkpoint.txt
|
||||
checkpoint_file: /tmp/results-checkpoint
|
||||
debug: false
|
||||
directory: /tmp/logscraper
|
||||
download: true
|
||||
|
||||
Reference in New Issue
Block a user