Report all build[set] completions to DB

We missed some cases where builds might be aborted and the results
not reported to the database.  This updates the test framework to
assert that tests end with no open builds or buildsets in the
database.

To fix the actual issues, we need to report some build completions
in the scheduler instead of the pipeline manager.  So to do that,
we grab a SQL reporter object when initializing the scheduler
(and we therefore no longer need to do so when initializing the
pipeline manager).  The SQL reporter isn't like the rest of the
reporters -- it isn't pipeline specific, so a single global instance
is fine.

Finally, initializing the SQL reporter during scheduler init had
some conflicts with a unit test which tested that the merger could
load "source-only" connections.  That test actually verified that
the *scheduler* loaded source-only connections.  So to correct this,
it now verifies that the executor (which has a merger and is under
the same constraints as the merger for this purpose) can do so.  We
no longer need the source_only flag in tests.

Change-Id: I1a983dcc9f4e5282c11af23813a4ca1c0f8e9d9d
This commit is contained in:
James E. Blair 2021-07-23 10:46:39 -07:00
parent 35e4745b09
commit 1fdf9f5490
6 changed files with 76 additions and 21 deletions

View File

@ -66,6 +66,7 @@ from git.util import IterableList
import yaml
import paramiko
import prometheus_client.exposition
import sqlalchemy
from zuul.driver.sql.sqlconnection import DatabaseSession
from zuul.model import (
@ -4143,7 +4144,7 @@ class SymLink(object):
class SchedulerTestApp:
def __init__(self, log, config, changes, additional_event_queues,
upstream_root, rpcclient, poller_events,
git_url_with_auth, source_only, fake_sql,
git_url_with_auth, fake_sql,
add_cleanup, validate_tenants, instance_id):
self.log = log
self.config = config
@ -4162,7 +4163,7 @@ class SchedulerTestApp:
add_cleanup,
fake_sql,
)
self.connections.configure(self.config, source_only=source_only)
self.connections.configure(self.config)
self.sched = TestScheduler(self.config, self.connections, self)
self.sched.log = logging.getLogger(f"zuul.Scheduler-{instance_id}")
@ -4212,7 +4213,7 @@ class SchedulerTestManager:
def create(self, log, config, changes, additional_event_queues,
upstream_root, rpcclient, poller_events,
git_url_with_auth, source_only, fake_sql, add_cleanup,
git_url_with_auth, fake_sql, add_cleanup,
validate_tenants):
# Since the config contains a regex we cannot use copy.deepcopy()
# as this will raise an exception with Python <3.7
@ -4233,7 +4234,7 @@ class SchedulerTestManager:
app = SchedulerTestApp(log, scheduler_config, changes,
additional_event_queues, upstream_root,
rpcclient, poller_events,
git_url_with_auth, source_only,
git_url_with_auth,
fake_sql, add_cleanup,
validate_tenants, instance_id)
self.instances.append(app)
@ -4346,7 +4347,6 @@ class ZuulTestCase(BaseTestCase):
use_ssl: bool = False
git_url_with_auth: bool = False
log_console_port: int = 19885
source_only: bool = False
fake_sql: bool = False
validate_tenants = None
@ -4504,7 +4504,7 @@ class ZuulTestCase(BaseTestCase):
self.upstream_root, self.rpcclient, self.poller_events,
self.git_url_with_auth, self.addCleanup, True)
executor_connections.configure(self.config,
source_only=self.source_only)
source_only=True)
self.executor_api = TestingExecutorApi(self.zk_client)
self.executor_server = RecordingExecutorServer(
self.config,
@ -4522,7 +4522,7 @@ class ZuulTestCase(BaseTestCase):
self.scheds.create(
self.log, self.config, self.changes,
self.additional_event_queues, self.upstream_root, self.rpcclient,
self.poller_events, self.git_url_with_auth, self.source_only,
self.poller_events, self.git_url_with_auth,
self.fake_sql, self.addCleanup, self.validate_tenants)
self.merge_server = None
@ -4830,6 +4830,32 @@ class ZuulTestCase(BaseTestCase):
with open(os.path.join(root, fn)) as f:
self.assertTrue(f.read() in test_keys)
def assertSQLState(self):
reporter = self.scheds.first.connections.getSqlReporter(None)
with self.scheds.first.connections.getSqlConnection().\
engine.connect() as conn:
try:
result = conn.execute(
sqlalchemy.sql.select(
reporter.connection.zuul_buildset_table)
)
except sqlalchemy.exc.ProgrammingError:
# Table doesn't exist
return
for buildset in result.fetchall():
self.assertIsNotNone(buildset.result)
result = conn.execute(
sqlalchemy.sql.select(reporter.connection.zuul_build_table)
)
for build in result.fetchall():
self.assertIsNotNone(build.result)
self.assertIsNotNone(build.start_time)
self.assertIsNotNone(build.end_time)
def assertFinalState(self):
self.log.debug("Assert final state")
# Make sure no jobs are running
@ -4847,6 +4873,7 @@ class ZuulTestCase(BaseTestCase):
self.assertEmptyQueues()
self.assertNodepoolState()
self.assertNoGeneratedKeys()
self.assertSQLState()
ipm = zuul.manager.independent.IndependentPipelineManager
for tenant in self.scheds.first.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():

View File

@ -539,17 +539,16 @@ class TestMultipleGerrits(ZuulTestCase):
class TestConnectionsMerger(ZuulTestCase):
config_file = 'zuul-connections-merger.conf'
tenant_config_file = 'config/single-tenant/main.yaml'
source_only = True
def test_connections_merger(self):
"Test merger only configures source connections"
self.assertIn("gerrit", self.scheds.first.connections.connections)
self.assertIn("github", self.scheds.first.connections.connections)
self.assertNotIn("smtp", self.scheds.first.connections.connections)
self.assertNotIn("sql", self.scheds.first.connections.connections)
self.assertNotIn("timer", self.scheds.first.connections.connections)
self.assertNotIn("zuul", self.scheds.first.connections.connections)
self.assertIn("gerrit", self.executor_server.connections.connections)
self.assertIn("github", self.executor_server.connections.connections)
self.assertNotIn("smtp", self.executor_server.connections.connections)
self.assertNotIn("sql", self.executor_server.connections.connections)
self.assertNotIn("timer", self.executor_server.connections.connections)
self.assertNotIn("zuul", self.executor_server.connections.connections)
class TestConnectionsCgit(ZuulTestCase):

View File

@ -92,7 +92,7 @@ class TestSchedulerZone(ZuulTestCase):
self.upstream_root, self.rpcclient, self.poller_events,
self.git_url_with_auth, self.addCleanup, self.fake_sql)
executor_connections.configure(self.config,
source_only=self.source_only)
source_only=True)
self.executor_server_unzoned = RecordingExecutorServer(
config,
connections=executor_connections,

View File

@ -90,7 +90,7 @@ class SQLReporter(BaseReporter):
)
return db_buildset
def reportBuildsetEnd(self, buildset, action, final):
def reportBuildsetEnd(self, buildset, action, final, result=None):
if not buildset.uuid:
return
if final:
@ -102,7 +102,7 @@ class SQLReporter(BaseReporter):
db_buildset = db.getBuildset(
tenant=buildset.item.pipeline.tenant.name, uuid=buildset.uuid)
if db_buildset:
db_buildset.result = buildset.result
db_buildset.result = buildset.result or result
db_buildset.message = message
elif buildset.builds:
self.log.error("Unable to find buildset "

View File

@ -60,7 +60,7 @@ class PipelineManager(metaclass=ABCMeta):
self.ref_filters = []
# Cached dynamic layouts (layout uuid -> layout)
self._layout_cache = {}
self.sql = self.sched.connections.getSqlReporter(pipeline)
self.sql = self.sched.sql
def __str__(self):
return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
@ -196,7 +196,7 @@ class PipelineManager(metaclass=ABCMeta):
)
# This might be called after canceljobs, which also sets a
# non-final 'cancel' result.
self.sql.reportBuildsetEnd(item.current_build_set, 'cancel',
self.sql.reportBuildsetEnd(item.current_build_set, 'dequeue',
final=False)
def sendReport(self, action_reporters, item, message=None):
@ -554,6 +554,7 @@ class PipelineManager(metaclass=ABCMeta):
# Without this check, all items with a valid result would be reported
# twice.
if not item.current_build_set.result and item.live:
item.setReportedResult('DEQUEUED')
self.reportDequeue(item)
def removeItem(self, item):
@ -716,8 +717,11 @@ class PipelineManager(metaclass=ABCMeta):
# reported immediately afterwards during queue processing.
if (prime and item.current_build_set.ref and not
item.didBundleStartReporting()):
# Force a dequeued result here because we haven't actually
# reported the item, but we are done with this buildset.
self.sql.reportBuildsetEnd(
item.current_build_set, 'dequeue', final=False)
item.current_build_set, 'dequeue', final=False,
result='DEQUEUED')
item.resetAllBuilds()
for job in jobs_to_cancel:
@ -1422,7 +1426,6 @@ class PipelineManager(metaclass=ABCMeta):
item = build.build_set.item
log.debug("Build %s of %s completed" % (build, item.change))
self.sql.reportBuildEnd(build, final=(not build.retry))
item.pipeline.tenant.semaphore_handler.release(item, build.job)
if item.getJob(build.job.name) is None:

View File

@ -145,6 +145,7 @@ class Scheduler(threading.Thread):
self._zuul_app = app
self.connections = connections
self.sql = self.connections.getSqlReporter(None)
self.statsd = get_statsd(config)
self.rpc = rpclistener.RPCListener(config, self)
self.rpc_slow = rpclistener.RPCListenerSlow(config, self)
@ -1124,6 +1125,13 @@ class Scheduler(threading.Thread):
item.getJob(request_job),
)
)
try:
self.sql.reportBuildsetEnd(
item.current_build_set, 'dequeue',
final=False, result='DEQUEUED')
except Exception:
self.log.exception(
"Error reporting buildset completion to DB:")
for build in builds_to_cancel:
self.log.info(
@ -1216,6 +1224,13 @@ class Scheduler(threading.Thread):
item.getJob(request_job),
)
)
try:
self.sql.reportBuildsetEnd(
item.current_build_set, 'dequeue',
final=False, result='DEQUEUED')
except Exception:
self.log.exception(
"Error reporting buildset completion to DB:")
for build in builds_to_cancel:
self.log.info(
@ -1826,6 +1841,11 @@ class Scheduler(threading.Thread):
# internal dict after it's added to the report queue.
self.executor.removeBuild(build)
try:
self.sql.reportBuildEnd(build, final=(not build.retry))
except Exception:
log.exception("Error reporting build completion to DB:")
if build.build_set is not build.build_set.item.current_build_set:
log.debug("Build %s is not in the current build set", build)
return
@ -2054,6 +2074,12 @@ class Scheduler(threading.Thread):
del self.executor.builds[build.uuid]
except KeyError:
pass
try:
self.sql.reportBuildEnd(build, final=False)
except Exception:
self.log.exception(
"Error reporting build completion to DB:")
else:
nodeset = buildset.getJobNodeSet(job_name)
if nodeset: