Remove time database

We can obtain the same information from the SQL database now, so
do that and remove the filesystem-based time database.  This will
help support multiple schedulers (as they will all have access to
the same data).

Nothing in the scheduler uses the state directory anymore, so clean
up the docs around that.  The executor still has a state dir where
it may install ansible-related files.

The SQL query was rather slow in practice because it created a
temporary table since it was filtering mostly by buildset fields
then sorting by build.id.  We can sort by buildset.id and get nearly
the same results (equally valid from our perspective) much faster.

In some configurations under postgres, we may see a performance
variation in the run-time of the query.  In order to keep the time
estimation out of the critical path of job launches, we perform
the SQL query asynchronously.  We may be able to remove this added
bit of complexity once the scale-out-scheduler work is finished
(and/or we we further define/restrict our database requirements).

Change-Id: Id3c64be7a05c9edc849e698200411ad436a1334d
This commit is contained in:
James E. Blair 2021-09-13 16:20:01 -07:00
parent 0928c39793
commit 7d7d2f9f2a
11 changed files with 163 additions and 202 deletions

View File

@ -353,11 +353,6 @@ The following sections of ``zuul.conf`` are used by the scheduler:
Path to PID lock file.
.. attr:: state_dir
:default: /var/lib/zuul
Path to directory in which Zuul should save its state.
.. attr:: relative_priority
:default: False

View File

@ -492,6 +492,15 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The size of the current connection event queue.
.. stat:: time_query
:type: timer
Each time the scheduler performs a query against the SQL
database in order to determine an estimated time for a job, it
emits this timer of the duration of the query. Note this is a
performance metric of how long the SQL query takes; it is not
the estimated time value itself.
.. stat:: zuul.geard
Gearman job distribution statistics. Gearman jobs encompass the

View File

@ -25,7 +25,6 @@ password=secret
tenant_config=/etc/zuul/main.yaml
log_config=/etc/zuul/logging.conf
pidfile=/var/run/zuul/zuul.pid
state_dir=/var/lib/zuul
prometheus_port=9091
;prometheus_addr=0.0.0.0

View File

@ -0,0 +1,10 @@
---
upgrade:
- |
The scheduler time database has been removed. This was stored in
the scheduler state directory, typically ``/var/lib/zuul/times``.
The entire state directory on the scheduler is no longer used and
may now be removed.
Zuul now derives its estimated build duration times from the SQL
database.

View File

@ -341,87 +341,6 @@ class TestJob(BaseTestCase):
item.freezeJobGraph(self.layout)
class TestJobTimeData(BaseTestCase):
def setUp(self):
super(TestJobTimeData, self).setUp()
self.tmp_root = self.useFixture(fixtures.TempDir(
rootdir=os.environ.get("ZUUL_TEST_ROOT"))
).path
def test_empty_timedata(self):
path = os.path.join(self.tmp_root, 'job-name')
self.assertFalse(os.path.exists(path))
self.assertFalse(os.path.exists(path + '.tmp'))
td = model.JobTimeData(path)
self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
def test_save_reload(self):
path = os.path.join(self.tmp_root, 'job-name')
self.assertFalse(os.path.exists(path))
self.assertFalse(os.path.exists(path + '.tmp'))
td = model.JobTimeData(path)
self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
success_times = []
failure_times = []
results = []
for x in range(10):
success_times.append(int(random.random() * 1000))
failure_times.append(int(random.random() * 1000))
results.append(0)
results.append(1)
random.shuffle(results)
s = f = 0
for result in results:
if result:
td.add(failure_times[f], 'FAILURE')
f += 1
else:
td.add(success_times[s], 'SUCCESS')
s += 1
self.assertEqual(td.success_times, success_times)
self.assertEqual(td.failure_times, failure_times)
self.assertEqual(td.results, results[10:])
td.save()
self.assertTrue(os.path.exists(path))
self.assertFalse(os.path.exists(path + '.tmp'))
td = model.JobTimeData(path)
td.load()
self.assertEqual(td.success_times, success_times)
self.assertEqual(td.failure_times, failure_times)
self.assertEqual(td.results, results[10:])
class TestTimeDataBase(BaseTestCase):
def setUp(self):
super(TestTimeDataBase, self).setUp()
self.tmp_root = self.useFixture(fixtures.TempDir(
rootdir=os.environ.get("ZUUL_TEST_ROOT"))
).path
self.db = model.TimeDataBase(self.tmp_root)
def test_timedatabase(self):
pipeline = Dummy(tenant=Dummy(name='test-tenant'))
change = Dummy(project=Dummy(canonical_name='git.example.com/foo/bar'))
job = Dummy(name='job-name')
item = Dummy(pipeline=pipeline,
change=change)
build = Dummy(build_set=Dummy(item=item),
job=job)
self.assertEqual(self.db.getEstimatedTime(build), 0)
self.db.update(build, 50, 'SUCCESS')
self.assertEqual(self.db.getEstimatedTime(build), 50)
self.db.update(build, 100, 'SUCCESS')
self.assertEqual(self.db.getEstimatedTime(build), 75)
for x in range(10):
self.db.update(build, 100, 'SUCCESS')
self.assertEqual(self.db.getEstimatedTime(build), 100)
class TestGraph(BaseTestCase):
def test_job_graph_disallows_multiple_jobs_with_same_name(self):
graph = model.JobGraph()

View File

@ -59,9 +59,11 @@ class DatabaseSession(object):
def getBuilds(self, tenant=None, project=None, pipeline=None,
change=None, branch=None, patchset=None, ref=None,
newrev=None, event_id=None, uuid=None, job_name=None,
voting=None, nodeset=None, result=None, provides=None,
final=None, held=None, complete=None, limit=50, offset=0):
newrev=None, event_id=None, uuid=None,
job_name=None, voting=None, nodeset=None,
result=None, provides=None, final=None, held=None,
complete=None, sort_by_buildset=False, limit=50,
offset=0):
build_table = self.connection.zuul_build_table
buildset_table = self.connection.zuul_buildset_table
@ -111,9 +113,14 @@ class DatabaseSession(object):
q = self.listFilter(q, provides_table.c.name, provides)
q = self.listFilter(q, build_table.c.held, held)
q = q.order_by(build_table.c.id.desc()).\
limit(limit).\
offset(offset)
if sort_by_buildset:
# If we don't need the builds to be strictly ordered, this
# query can be much faster as it may avoid the use of a
# temporary table.
q = q.order_by(buildset_table.c.id.desc())
else:
q = q.order_by(build_table.c.id.desc())
q = q.limit(limit).offset(offset)
try:
return q.all()
@ -355,6 +362,15 @@ class SQLConnection(BaseConnection):
sa.Index(self.table_prefix + 'uuid_buildset_id_idx',
uuid, buildset_id)
@property
def duration(self):
if self.start_time and self.end_time:
return max(0.0,
(self.end_time -
self.start_time).total_seconds())
else:
return None
def createArtifact(self, *args, **kw):
session = orm.session.Session.object_session(self)
# SQLAlchemy reserves the 'metadata' attribute on

View File

@ -132,6 +132,10 @@ class SQLReporter(BaseReporter):
return db_build
def getBuilds(self, *args, **kw):
"""Return a list of Build objects"""
return self.connection.getBuilds(*args, **kw)
def report(self, item):
# We're not a real reporter, but we use _formatItemReport, so
# we inherit from the reporters.

96
zuul/lib/times.py Normal file
View File

@ -0,0 +1,96 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import queue
import cachetools
class Times:
"""Perform asynchronous database queries to estimate build times.
To avoid allowing the SQL database to become a bottelneck when
launching builds, this class performs asynchronous queries against
the DB and returns estimated build times.
This is intended as a temporary hedge against performance
regressions during Zuul v4 development and can likely be removed
once multiple schedulers are supported and possible tightening of
database requirements.
"""
log = logging.getLogger("zuul.times")
def __init__(self, sql, statsd):
self.sql = sql
self.statsd = statsd
self.queue = queue.Queue()
self.cache = cachetools.TTLCache(8192, 3600)
self.thread = threading.Thread(target=self.run)
self.running = False
def start(self):
self.running = True
self.thread.start()
def stop(self):
self.running = False
self.queue.put(None)
def join(self):
return self.thread.join()
def run(self):
while self.running:
key = self.queue.get()
if key is None:
continue
try:
# Double check that we haven't added this key since it
# was requested
if key in self.cache:
continue
with self.statsd.timer('zuul.scheduler.time_query'):
self._getTime(key)
except Exception:
self.log.exception("Error querying DB for build %s", key)
def _getTime(self, key):
tenant, project, branch, job = key
previous_builds = self.sql.getBuilds(
tenant=tenant,
project=project,
branch=branch,
job_name=job,
final=True,
result='SUCCESS',
limit=10,
sort_by_buildset=True)
times = [x.duration for x in previous_builds if x.duration]
if times:
estimate = float(sum(times)) / len(times)
self.cache.setdefault(key, estimate)
# Don't cache a zero value, so that new jobs get an estimated
# time ASAP.
def getEstimatedTime(self, tenant, project, branch, job):
key = (tenant, project, branch, job)
ret = self.cache.get(key)
if ret is not None:
return ret
self.queue.put(key)
return None

View File

@ -712,6 +712,14 @@ class PipelineManager(metaclass=ABCMeta):
else:
relative_priority = 0
for job in jobs:
# Request an estimated time here in order to give the time
# thread an opportunity to perform the SQL query in the
# background if necessary.
self.sched.times.getEstimatedTime(
item.pipeline.tenant.name,
item.change.project.name,
getattr(item.change, 'branch', None),
job.name)
provider = self._getPausedParentProvider(build_set, job)
priority = self._calculateNodeRequestPriority(build_set, job)
tenant_name = build_set.item.pipeline.tenant.name

View File

@ -22,7 +22,6 @@ import os
from functools import total_ordering
import re2
import struct
import time
from uuid import uuid4
import urllib.parse
@ -5877,91 +5876,6 @@ class Abide(object):
del self.unparsed_project_branch_cache[canonical_project_name]
class JobTimeData(object):
format = 'B10H10H10B'
version = 0
def __init__(self, path):
self.path = path
self.success_times = [0 for x in range(10)]
self.failure_times = [0 for x in range(10)]
self.results = [0 for x in range(10)]
def load(self):
if not os.path.exists(self.path):
return
with open(self.path, 'rb') as f:
data = struct.unpack(self.format, f.read())
version = data[0]
if version != self.version:
raise Exception("Unkown data version")
self.success_times = list(data[1:11])
self.failure_times = list(data[11:21])
self.results = list(data[21:32])
def save(self):
tmpfile = self.path + '.tmp'
data = [self.version]
data.extend(self.success_times)
data.extend(self.failure_times)
data.extend(self.results)
data = struct.pack(self.format, *data)
with open(tmpfile, 'wb') as f:
f.write(data)
os.rename(tmpfile, self.path)
def add(self, elapsed, result):
elapsed = int(elapsed)
if result == 'SUCCESS':
self.success_times.append(elapsed)
self.success_times.pop(0)
result = 0
else:
self.failure_times.append(elapsed)
self.failure_times.pop(0)
result = 1
self.results.append(result)
self.results.pop(0)
def getEstimatedTime(self):
times = [x for x in self.success_times if x]
if times:
return float(sum(times)) / len(times)
return 0.0
class TimeDataBase(object):
def __init__(self, root):
self.root = root
def _getTD(self, build):
if hasattr(build.build_set.item.change, 'branch'):
branch = build.build_set.item.change.branch
else:
branch = ''
dir_path = os.path.join(
self.root,
build.build_set.item.pipeline.tenant.name,
build.build_set.item.change.project.canonical_name,
branch)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
path = os.path.join(dir_path, build.job.name)
td = JobTimeData(path)
td.load()
return td
def getEstimatedTime(self, name):
return self._getTD(name).getEstimatedTime()
def update(self, build, elapsed, result):
td = self._getTD(build)
td.add(elapsed, result)
td.save()
class Capabilities(object):
"""The set of capabilities this Zuul installation has.

View File

@ -17,7 +17,6 @@
import json
import logging
import os
import socket
import sys
import threading
@ -41,6 +40,7 @@ from zuul.lib.gear_utils import getGearmanFunctions
from zuul.lib.keystorage import KeyStorage
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.queue import NamedQueue
from zuul.lib.times import Times
from zuul.lib.statsd import get_statsd, normalize_statsd_name
import zuul.lib.queue
import zuul.lib.repl
@ -66,7 +66,6 @@ from zuul.model import (
PromoteEvent,
ReconfigureEvent,
TenantReconfigureEvent,
TimeDataBase,
UnparsedAbideConfig,
SystemAttributes,
STATE_FAILED,
@ -158,6 +157,7 @@ class Scheduler(threading.Thread):
self.connections = connections
self.sql = self.connections.getSqlReporter(None)
self.statsd = get_statsd(config)
self.times = Times(self.sql, self.statsd)
self.rpc = rpclistener.RPCListener(config, self)
self.rpc_slow = rpclistener.RPCListenerSlow(config, self)
self.repl = None
@ -224,10 +224,6 @@ class Scheduler(threading.Thread):
self.wake_event.set)
self.local_layout_state = {}
if not testonly:
time_dir = self._get_time_database_dir()
self.time_database = TimeDataBase(time_dir)
command_socket = get_default(
self.config, 'scheduler', 'command_socket',
'/var/lib/zuul/scheduler.socket')
@ -269,6 +265,7 @@ class Scheduler(threading.Thread):
self.rpc_slow.start()
self.stats_thread.start()
self.apsched.start()
self.times.start()
# Start an anonymous thread to perform initial cleanup, then
# schedule later cleanup tasks.
t = threading.Thread(target=self.startCleanup, name='cleanup start')
@ -279,6 +276,7 @@ class Scheduler(threading.Thread):
def stop(self):
self._stopped = True
self.component_info.state = self.component_info.STOPPED
self.times.stop()
self.nodepool.stop()
self.stop_event.set()
self.stopConnections()
@ -293,6 +291,7 @@ class Scheduler(threading.Thread):
self._command_running = False
self.command_socket.stop()
self.command_thread.join()
self.times.join()
self.join()
self.zk_client.disconnect()
@ -917,14 +916,6 @@ class Scheduler(threading.Thread):
result.wait()
self.log.debug("Enqueue complete")
def _get_time_database_dir(self):
state_dir = get_default(self.config, 'scheduler', 'state_dir',
'/var/lib/zuul', expand_user=True)
d = os.path.join(state_dir, 'times')
if not os.path.exists(d):
os.mkdir(d)
return d
def _get_key_store_password(self):
try:
return self.config["keystore"]["password"]
@ -1957,8 +1948,15 @@ class Scheduler(threading.Thread):
log = get_annotated_logger(
self.log, build.zuul_event_id, build=build.uuid)
try:
build.estimated_time = float(self.time_database.getEstimatedTime(
build))
change = build.build_set.item.change
estimate = self.times.getEstimatedTime(
pipeline.tenant.name,
change.project.name,
getattr(change, 'branch', None),
build.job.name)
if not estimate:
estimate = 0.0
build.estimated_time = estimate
except Exception:
log.exception("Exception estimating build time:")
pipeline.manager.onBuildStarted(build)
@ -2084,13 +2082,6 @@ class Scheduler(threading.Thread):
except Exception:
log.exception("Error reporting build completion to DB:")
if build.end_time and build.start_time and build.result:
duration = build.end_time - build.start_time
try:
self.time_database.update(build, duration, build.result)
except Exception:
log.exception("Exception recording build time:")
pipeline.manager.onBuildCompleted(build)
def _cleanupCompletedBuild(self, build):