From 97811f53d672c53babcca9365acefe5f6d4dee7a Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Mon, 17 May 2021 19:58:32 -0700 Subject: [PATCH] Report intermediate buildsets and builds This moves some functions of the SQL reporter into the pipeline manager, so that builds and buildsets are always recorded in the database when started and when completed. The 'final' flag is used to indicate whether a build or buildset result is user-visible or not. Change-Id: I053e195d120ecbb2fd89cf7e1e9fc7eccc9dcd2f --- tests/unit/test_connection.py | 62 +++++++++------------- tests/unit/test_database.py | 39 ++++++++++++++ tests/unit/test_scheduler.py | 16 ++---- zuul/configloader.py | 17 ------ zuul/driver/sql/sqlconnection.py | 27 +++++++++- zuul/driver/sql/sqlreporter.py | 91 ++++++++++++++++++++++++++++++++ zuul/manager/__init__.py | 24 +++++++++ zuul/reporter/__init__.py | 9 ++-- 8 files changed, 215 insertions(+), 70 deletions(-) diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index 3f70e2e297..cb1f7b6bae 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -126,16 +126,15 @@ class TestSQLConnectionMysql(ZuulTestCase): def test_sql_results(self): "Test results are entered into an sql table" - def check_results(connection_name): + def check_results(): # Grab the sa tables tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - reporter = _get_reporter_from_connection_name( - tenant.layout.pipelines['check'].success_actions, - connection_name - ) - - conn = self.scheds.first.connections.connections[connection_name].\ + pipeline = tenant.layout.pipelines['check'] + reporter = self.scheds.first.connections.getSqlReporter( + pipeline) + conn = self.scheds.first.connections.getSqlConnection().\ engine.connect() + result = conn.execute( sa.sql.select([reporter.connection.zuul_buildset_table])) @@ -229,22 +228,21 @@ class TestSQLConnectionMysql(ZuulTestCase): self.orderedRelease() self.waitUntilSettled() - check_results('database') + check_results() def test_sql_results_retry_builds(self): "Test that retry results are entered into an sql table correctly" # Check the results - def check_results(connection_name): + def check_results(): # Grab the sa tables tenant = self.scheds.first.sched.abide.tenants.get("tenant-one") - reporter = _get_reporter_from_connection_name( - tenant.layout.pipelines["check"].success_actions, - connection_name - ) + pipeline = tenant.layout.pipelines['check'] + reporter = self.scheds.first.connections.getSqlReporter( + pipeline) - with self.scheds.first.connections.connections[connection_name]\ - .engine.connect() as conn: + with self.scheds.first.connections.getSqlConnection().\ + engine.connect() as conn: result = conn.execute( sa.sql.select([reporter.connection.zuul_buildset_table]) @@ -282,13 +280,13 @@ class TestSQLConnectionMysql(ZuulTestCase): self.assertEqual('project-test1', buildset0_builds[1]['job_name']) self.assertEqual('RETRY', buildset0_builds[1]['result']) self.assertFalse(buildset0_builds[1]['final']) - self.assertEqual('project-test1', buildset0_builds[2]['job_name']) - self.assertEqual('SUCCESS', buildset0_builds[2]['result']) - self.assertTrue(buildset0_builds[2]['final']) + self.assertEqual('project-test2', buildset0_builds[2]['job_name']) + self.assertEqual('RETRY', buildset0_builds[2]['result']) + self.assertFalse(buildset0_builds[2]['final']) - self.assertEqual('project-test2', buildset0_builds[3]['job_name']) - self.assertEqual('RETRY', buildset0_builds[3]['result']) - self.assertFalse(buildset0_builds[3]['final']) + self.assertEqual('project-test1', buildset0_builds[3]['job_name']) + self.assertEqual('SUCCESS', buildset0_builds[3]['result']) + self.assertTrue(buildset0_builds[3]['final']) self.assertEqual('project-test2', buildset0_builds[4]['job_name']) self.assertEqual('SUCCESS', buildset0_builds[4]['result']) self.assertTrue(buildset0_builds[4]['final']) @@ -309,7 +307,7 @@ class TestSQLConnectionMysql(ZuulTestCase): self.orderedRelease() self.waitUntilSettled() - check_results('database') + check_results() def test_multiple_sql_connections(self): "Test putting results in different databases" @@ -326,14 +324,13 @@ class TestSQLConnectionMysql(ZuulTestCase): def check_results(connection_name_1, connection_name_2): # Grab the sa tables for resultsdb - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - reporter1 = _get_reporter_from_connection_name( - tenant.layout.pipelines['check'].success_actions, - connection_name_1 - ) + tenant = self.scheds.first.sched.abide.tenants.get("tenant-one") + pipeline = tenant.layout.pipelines['check'] + reporter1 = self.scheds.first.connections.getSqlReporter( + pipeline) - conn = self.scheds.first.connections.\ - connections[connection_name_1].engine.connect() + conn = self.scheds.first.connections.getSqlConnection().\ + engine.connect() buildsets_resultsdb = conn.execute(sa.sql.select( [reporter1.connection.zuul_buildset_table])).fetchall() # Should have been 2 buildset reported to the resultsdb (both @@ -350,13 +347,6 @@ class TestSQLConnectionMysql(ZuulTestCase): self.assertEqual( 'Build succeeded.', buildsets_resultsdb[0]['message']) - # Grab the sa tables for resultsdb_mysql_failures - reporter2 = _get_reporter_from_connection_name( - tenant.layout.pipelines['check'].failure_actions, - connection_name_2 - ) - self.assertIsNone(reporter2) # Explicit SQL reporters are ignored - buildsets_resultsdb_failures = conn.execute(sa.sql.select( [reporter1.connection.zuul_buildset_table])).fetchall() # The failure db should only have 1 buildset failed diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index b7824c1d58..d486b40c71 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -75,6 +75,45 @@ class TestMysqlDatabase(BaseTestCase): f"show create table {table}").one()[1] self.compareMysql(create, sqlalchemy_tables[table]) + def test_buildsets(self): + tenant = 'tenant1', + buildset_uuid = 'deadbeef' + change = 1234 + buildset_args = dict( + uuid=buildset_uuid, + tenant=tenant, + pipeline='check', + project='project', + change=change, + patchset='1', + ref='', + oldrev='', + newrev='', + branch='master', + zuul_ref='Zdeadbeef', + ref_url='http://example.com/1234', + event_id='eventid', + ) + + # Create the buildset entry (driver-internal interface) + with self.connection.getSession() as db: + db.createBuildSet(**buildset_args) + + # Verify that worked using the driver-external interface + self.assertEqual(len(self.connection.getBuildsets()), 1) + self.assertEqual(self.connection.getBuildsets()[0].uuid, buildset_uuid) + + # Update the buildset using the internal interface + with self.connection.getSession() as db: + db_buildset = db.getBuildset(tenant=tenant, uuid=buildset_uuid) + self.assertEqual(db_buildset.change, change) + db_buildset.result = 'SUCCESS' + + # Verify that worked + db_buildset = self.connection.getBuildset( + tenant=tenant, uuid=buildset_uuid) + self.assertEqual(db_buildset.result, 'SUCCESS') + class TestPostgresqlDatabase(BaseTestCase): def setUp(self): diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 32ca51217e..cc18b6a90c 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -5539,34 +5539,28 @@ For CI problems and help debugging, contact ci@example.org""" tenant.layout.pipelines['gate'].merge_failure_message) self.assertEqual( - len(tenant.layout.pipelines['check'].merge_failure_actions), 2) + len(tenant.layout.pipelines['check'].merge_failure_actions), 1) self.assertEqual( - len(tenant.layout.pipelines['gate'].merge_failure_actions), 3) + len(tenant.layout.pipelines['gate'].merge_failure_actions), 2) self.assertTrue(isinstance( - tenant.layout.pipelines['check'].merge_failure_actions[1], + tenant.layout.pipelines['check'].merge_failure_actions[0], gerritreporter.GerritReporter)) self.assertTrue( ( isinstance(tenant.layout.pipelines['gate']. merge_failure_actions[0], - zuul.driver.sql.sqlreporter.SQLReporter) and - isinstance(tenant.layout.pipelines['gate']. - merge_failure_actions[1], zuul.driver.smtp.smtpreporter.SMTPReporter) and isinstance(tenant.layout.pipelines['gate']. - merge_failure_actions[2], + merge_failure_actions[1], gerritreporter.GerritReporter) ) or ( isinstance(tenant.layout.pipelines['gate']. merge_failure_actions[0], - zuul.driver.sql.sqlreporter.SQLReporter) and - isinstance(tenant.layout.pipelines['gate']. - merge_failure_actions[1], gerritreporter.GerritReporter) and isinstance(tenant.layout.pipelines['gate']. - merge_failure_actions[2], + merge_failure_actions[1], zuul.driver.smtp.smtpreporter.SMTPReporter) ) ) diff --git a/zuul/configloader.py b/zuul/configloader.py index bfff55ddea..e4c5608fc5 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -1262,13 +1262,6 @@ class PipelineParser(object): reporter_set = [] allowed_reporters = self.pcontext.tenant.allowed_reporters if conf.get(conf_key): - if conf_key in ['success', 'failure', 'merge-failure']: - # SQL reporters are required (an implied SQL reporter is - # added to every pipeline, ...(1) - sql_reporter = self.pcontext.connections\ - .getSqlReporter(pipeline) - sql_reporter.setAction(conf_key) - reporter_set.append(sql_reporter) for reporter_name, params \ in conf.get(conf_key).items(): if allowed_reporters is not None and \ @@ -1290,16 +1283,6 @@ class PipelineParser(object): if not pipeline.merge_failure_actions: pipeline.merge_failure_actions = pipeline.failure_actions - for conf_key, action in self.reporter_actions.items(): - if conf_key in ['success', 'failure', 'merge-failure']\ - and not getattr(pipeline, action): - # SQL reporters are required ... add SQL reporters to the - # rest of actions. - sql_reporter = self.pcontext.connections\ - .getSqlReporter(pipeline) - sql_reporter.setAction(conf_key) - setattr(pipeline, action, [sql_reporter]) - pipeline.disable_at = conf.get( 'disable-after-consecutive-failures', None) diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py index 4d1e164def..474d227de2 100644 --- a/zuul/driver/sql/sqlconnection.py +++ b/zuul/driver/sql/sqlconnection.py @@ -116,6 +116,30 @@ class DatabaseSession(object): except sqlalchemy.orm.exc.NoResultFound: return [] + def getBuild(self, tenant, uuid): + build_table = self.connection.zuul_build_table + buildset_table = self.connection.zuul_buildset_table + + # contains_eager allows us to perform eager loading on the + # buildset *and* use that table in filters (unlike + # joinedload). + q = self.session().query(self.connection.buildModel).\ + join(self.connection.buildSetModel).\ + outerjoin(self.connection.providesModel).\ + options(orm.contains_eager(self.connection.buildModel.buildset), + orm.selectinload(self.connection.buildModel.provides), + orm.selectinload(self.connection.buildModel.artifacts)) + + q = self.listFilter(q, buildset_table.c.tenant, tenant) + q = self.listFilter(q, build_table.c.uuid, uuid) + + try: + return q.one() + except sqlalchemy.orm.exc.NoResultFound: + return None + except sqlalchemy.orm.exc.MultipleResultsFound: + raise Exception("Multiple builds found with uuid %s", uuid) + def createBuildSet(self, *args, **kw): bs = self.connection.buildSetModel(*args, **kw) self.session().add(bs) @@ -173,8 +197,7 @@ class DatabaseSession(object): except sqlalchemy.orm.exc.NoResultFound: return None except sqlalchemy.orm.exc.MultipleResultsFound: - self.log.error("Multiple buildset found with uuid %s", uuid) - return None + raise Exception("Multiple buildset found with uuid %s", uuid) class SQLConnection(BaseConnection): diff --git a/zuul/driver/sql/sqlreporter.py b/zuul/driver/sql/sqlreporter.py index ce2839b275..a633d0d64c 100644 --- a/zuul/driver/sql/sqlreporter.py +++ b/zuul/driver/sql/sqlreporter.py @@ -63,6 +63,97 @@ class SQLReporter(BaseReporter): return db_build + def reportBuildsetStart(self, buildset): + """Create the initial buildset entry in the db""" + if not buildset.uuid: + return + event_id = None + item = buildset.item + if item.event is not None: + event_id = getattr(item.event, "zuul_event_id", None) + + with self.connection.getSession() as db: + db_buildset = db.createBuildSet( + uuid=buildset.uuid, + tenant=item.pipeline.tenant.name, + pipeline=item.pipeline.name, + project=item.change.project.name, + change=getattr(item.change, 'number', None), + patchset=getattr(item.change, 'patchset', None), + ref=getattr(item.change, 'ref', ''), + oldrev=getattr(item.change, 'oldrev', ''), + newrev=getattr(item.change, 'newrev', ''), + branch=getattr(item.change, 'branch', ''), + zuul_ref=buildset.ref, + ref_url=item.change.url, + event_id=event_id, + ) + return db_buildset + + def reportBuildsetEnd(self, buildset, action, final): + if not buildset.uuid: + return + if final: + message = self._formatItemReport( + buildset.item, with_jobs=False, action=action) + else: + message = None + with self.connection.getSession() as db: + db_buildset = db.getBuildset( + tenant=buildset.item.pipeline.tenant.name, uuid=buildset.uuid) + db_buildset.result = buildset.result + db_buildset.message = message + + def reportBuildStart(self, build): + buildset = build.build_set + start_time = build.start_time or time.time() + start = datetime.datetime.fromtimestamp(start_time, + tz=datetime.timezone.utc) + with self.connection.getSession() as db: + db_buildset = db.getBuildset( + tenant=buildset.item.pipeline.tenant.name, uuid=buildset.uuid) + + db_build = db_buildset.createBuild( + uuid=build.uuid, + job_name=build.job.name, + start_time=start, + voting=build.job.voting, + node_name=build.node_name, + ) + return db_build + + def reportBuildEnd(self, build, final): + end_time = build.end_time or time.time() + end = datetime.datetime.fromtimestamp(end_time, + tz=datetime.timezone.utc) + with self.connection.getSession() as db: + db_build = db.getBuild( + tenant=build.build_set.item.pipeline.tenant.name, + uuid=build.uuid) + if not db_build: + return None + + db_build.result = build.result + db_build.end_time = end + db_build.log_url = build.log_url + db_build.node_name = build.node_name + db_build.error_detail = build.error_detail + db_build.final = final + db_build.held = build.held + + for provides in build.job.provides: + db_build.createProvides(name=provides) + + for artifact in get_artifacts_from_result_data( + build.result_data, + logger=self.log): + if 'metadata' in artifact: + artifact['metadata'] = json.dumps(artifact['metadata']) + db_build.createArtifact(**artifact) + + return db_build + + # TODO: remove def report(self, item): """Create an entry into a database.""" event_id = None diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 249a3d8aa3..e0d9ee2b5d 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -60,6 +60,7 @@ class PipelineManager(metaclass=ABCMeta): self.ref_filters = [] # Cached dynamic layouts (layout uuid -> layout) self._layout_cache = {} + self.sql = self.sched.connections.getSqlReporter(pipeline) def __str__(self): return "<%s %s>" % (self.__class__.__name__, self.pipeline.name) @@ -193,6 +194,10 @@ class PipelineManager(metaclass=ABCMeta): self.log.error( "Reporting item dequeue %s received: %s", item, ret ) + # This might be called after canceljobs, which also sets a + # non-final 'cancel' result. + self.sql.reportBuildsetEnd(item.current_build_set, 'cancel', + final=False) def sendReport(self, action_reporters, item, message=None): """Sends the built message off to configured reporters. @@ -415,6 +420,8 @@ class PipelineManager(metaclass=ABCMeta): ci ): self.sendReport(actions, ci) + self.sql.reportBuildsetEnd(ci.current_build_set, + 'failure', final=True) return False @@ -679,6 +686,8 @@ class PipelineManager(metaclass=ABCMeta): # reported immediately afterwards during queue processing. if (prime and item.current_build_set.ref and not item.didBundleStartReporting()): + self.sql.reportBuildsetEnd( + item.current_build_set, 'dequeue', final=False) item.resetAllBuilds() for job in jobs_to_cancel: @@ -1027,6 +1036,7 @@ class PipelineManager(metaclass=ABCMeta): tpc = tenant.project_configs.get(item.change.project.canonical_name) if not build_set.ref: build_set.setConfiguration() + self.sql.reportBuildsetStart(build_set) # Next, if a change ahead has a broken config, then so does # this one. Record that and don't do anything else. @@ -1275,6 +1285,8 @@ class PipelineManager(metaclass=ABCMeta): for ri in reported_items: ri.setReportedResult('FAILURE') self.sendReport(actions, ri) + self.sql.reportBuildsetEnd(ri.current_build_set, + 'failure', final=True) def processQueue(self): # Do whatever needs to be done for each change in the queue @@ -1316,6 +1328,7 @@ class PipelineManager(metaclass=ABCMeta): def onBuildStarted(self, build): log = get_annotated_logger(self.log, build.zuul_event_id) log.debug("Build %s started", build) + self.sql.reportBuildStart(build) return True def onBuildPaused(self, build): @@ -1378,6 +1391,7 @@ class PipelineManager(metaclass=ABCMeta): item = build.build_set.item log.debug("Build %s of %s completed" % (build, item.change)) + self.sql.reportBuildEnd(build, final=(not build.retry)) item.pipeline.tenant.semaphore_handler.release(item, build.job) if item.getJob(build.job.name) is None: @@ -1541,42 +1555,51 @@ class PipelineManager(metaclass=ABCMeta): log.debug("Project %s not in pipeline %s for change %s", item.change.project, self.pipeline, item.change) project_in_pipeline = False + action = 'no-jobs' actions = self.pipeline.no_jobs_actions item.setReportedResult('NO_JOBS') elif item.getConfigErrors(): log.debug("Invalid config for change %s", item.change) # TODOv3(jeblair): consider a new reporter action for this + action = 'merge-failure' actions = self.pipeline.merge_failure_actions item.setReportedResult('CONFIG_ERROR') elif item.didMergerFail(): log.debug("Merger failure") + action = 'merge-failure' actions = self.pipeline.merge_failure_actions item.setReportedResult('MERGER_FAILURE') elif item.wasDequeuedNeedingChange(): log.debug("Dequeued needing change") + action = 'failure' actions = self.pipeline.failure_actions item.setReportedResult('FAILURE') elif not item.getJobs(): # We don't send empty reports with +1 log.debug("No jobs for change %s", item.change) + action = 'no-jobs' actions = self.pipeline.no_jobs_actions item.setReportedResult('NO_JOBS') elif item.cannotMergeBundle(): log.debug("Bundle can not be merged") + action = 'failure' actions = self.pipeline.failure_actions item.setReportedResult("FAILURE") elif item.isBundleFailing(): log.debug("Bundle is failing") + action = 'failure' actions = self.pipeline.failure_actions item.setReportedResult("FAILURE") if not item.didAllJobsSucceed(): self.pipeline._consecutive_failures += 1 elif item.didAllJobsSucceed() and not item.isBundleFailing(): log.debug("success %s", self.pipeline.success_actions) + action = 'success' actions = self.pipeline.success_actions item.setReportedResult('SUCCESS') self.pipeline._consecutive_failures = 0 else: + action = 'failure' actions = self.pipeline.failure_actions item.setReportedResult('FAILURE') self.pipeline._consecutive_failures += 1 @@ -1593,6 +1616,7 @@ class PipelineManager(metaclass=ABCMeta): ret = self.sendReport(actions, item) if ret: log.error("Reporting item %s received: %s", item, ret) + self.sql.reportBuildsetEnd(item.current_build_set, action, final=True) return ret def reportStats(self, item, added=False): diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py index 0c3780ffc9..8dae86cd88 100644 --- a/zuul/reporter/__init__.py +++ b/zuul/reporter/__init__.py @@ -115,7 +115,7 @@ class BaseReporter(object, metaclass=abc.ABCMeta): del comments[fn] item.warning("Comments left for invalid file %s" % (fn,)) - def _getFormatter(self): + def _getFormatter(self, action): format_methods = { 'enqueue': self._formatItemReportEnqueue, 'start': self._formatItemReportStart, @@ -126,12 +126,13 @@ class BaseReporter(object, metaclass=abc.ABCMeta): 'disabled': self._formatItemReportDisabled, 'dequeue': self._formatItemReportDequeue, } - return format_methods[self._action] + return format_methods[action] - def _formatItemReport(self, item, with_jobs=True): + def _formatItemReport(self, item, with_jobs=True, action=None): """Format a report from the given items. Usually to provide results to a reporter taking free-form text.""" - ret = self._getFormatter()(item, with_jobs) + action = action or self._action + ret = self._getFormatter(action)(item, with_jobs) if item.current_build_set.warning_messages: warning = '\n '.join(item.current_build_set.warning_messages)