Store pause and resume events on the build and report them

When a build is paused or resumed, we now store this information on the
build together with the event time. Instead of additional attributes for
each timestamp, we add an "event" list attribute to the build which can
also be used for other events in the future.

The events are stored in the SQL database and added to the MQTT payload
so the information can be used by the zuul-web UI (e.g. in the "build
times" gantt chart) or provided to external services.

Change-Id: I789b4f69faf96e3b8fd090a2e389df3bb9efd602
This commit is contained in:
Felix Edel 2022-12-07 12:24:57 +01:00
parent 532c30469f
commit f9786ac2a8
9 changed files with 220 additions and 5 deletions

View File

@ -0,0 +1,5 @@
---
features:
- |
Zuul now stores the pause and resume events for a build together with
their timestamp and reports them to the SQL database and via MQTT.

View File

@ -130,11 +130,12 @@ class TestSQLConnectionMysql(ZuulTestCase):
sa.sql.select([reporter.connection.zuul_buildset_table]))
buildsets = result.fetchall()
self.assertEqual(4, len(buildsets))
self.assertEqual(5, len(buildsets))
buildset0 = buildsets[0]
buildset1 = buildsets[1]
buildset2 = buildsets[2]
buildset3 = buildsets[3]
buildset4 = buildsets[4]
self.assertEqual('check', buildset0['pipeline'])
self.assertEqual('org/project', buildset0['project'])
@ -221,6 +222,40 @@ class TestSQLConnectionMysql(ZuulTestCase):
buildset3_builds[1]['end_time'],
buildset3_builds[1]['start_time'])
# Check the paused build result
buildset4_builds = conn.execute(
sa.sql.select([
reporter.connection.zuul_build_table
]).where(
reporter.connection.zuul_build_table.c.buildset_id ==
buildset4['id']
).order_by(reporter.connection.zuul_build_table.c.id)
).fetchall()
paused_build_events = conn.execute(
sa.sql.select([
reporter.connection.zuul_build_event_table
]).where(
reporter.connection.zuul_build_event_table.c.build_id
== buildset4_builds[0]["id"]
)
).fetchall()
self.assertEqual(len(paused_build_events), 2)
pause_event = paused_build_events[0]
resume_event = paused_build_events[1]
self.assertEqual(
pause_event["event_type"], "paused")
self.assertIsNotNone(pause_event["event_time"])
self.assertIsNone(pause_event["description"])
self.assertEqual(
resume_event["event_type"], "resumed")
self.assertIsNotNone(resume_event["event_time"])
self.assertIsNone(resume_event["description"])
self.assertGreater(
resume_event["event_time"], pause_event["event_time"])
self.executor_server.hold_jobs_in_build = True
# Add a success result
@ -264,6 +299,23 @@ class TestSQLConnectionMysql(ZuulTestCase):
self.orderedRelease()
self.waitUntilSettled()
# We are pausing a job within this test, so holding the jobs in
# build and releasing them in order becomes difficult as the
# paused job will either be paused or waiting on the child jobs
# to start.
# As we are not interested in the order the jobs are running but
# only on the results in the database, simply deactivate
# hold_jobs_in_build.
self.executor_server.hold_jobs_in_build = False
# Add a paused build result
self.log.debug("Adding paused build result")
D = self.fake_gerrit.addFakeChange("org/project", "master", "D")
self.executor_server.returnData(
"project-merge", D, {"zuul": {"pause": True}})
self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
check_results()
def test_sql_results_retry_builds(self):
@ -743,6 +795,37 @@ class TestMQTTConnection(ZuulTestCase):
self.assertIn('uuid', mqtt_payload)
self.assertEquals(dependent_test_job['dependencies'], ['test'])
def test_mqtt_paused_job(self):
A = self.fake_gerrit.addFakeChange("org/project", "master", "A")
# Let the job being paused via the executor
self.executor_server.returnData("test", A, {"zuul": {"pause": True}})
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
success_event = self.mqtt_messages.pop()
mqtt_payload = success_event["msg"]
self.assertEquals(mqtt_payload["project"], "org/project")
builds = mqtt_payload["buildset"]["builds"]
paused_job = [b for b in builds if b["job_name"] == "test"][0]
self.assertEquals(len(paused_job["events"]), 2)
pause_event = paused_job["events"][0]
self.assertEquals(pause_event["event_type"], "paused")
self.assertGreater(
pause_event["event_time"], paused_job["start_time"])
self.assertLess(pause_event["event_time"], paused_job["end_time"])
resume_event = paused_job["events"][1]
self.assertEquals(resume_event["event_type"], "resumed")
self.assertGreater(
resume_event["event_time"], paused_job["start_time"])
self.assertLess(resume_event["event_time"], paused_job["end_time"])
self.assertGreater(
resume_event["event_time"], pause_event["event_time"])
def test_mqtt_invalid_topic(self):
in_repo_conf = textwrap.dedent(
"""

View File

@ -79,7 +79,8 @@ class MQTTReporter(BaseReporter):
'result': result,
'dependencies': [j.name for j in job.dependencies],
'artifacts': get_artifacts_from_result_data(
build.result_data, logger=log)
build.result_data, logger=log),
'events': [e.toDict() for e in build.events],
})
if include_returned_data:
rdata = build.result_data.copy()

View File

@ -0,0 +1,49 @@
# Copyright 2022 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""add_build_event_table
Revision ID: 0ed5def089e2
Revises: c7467b642498
Create Date: 2022-12-12 12:08:20.882790
"""
# revision identifiers, used by Alembic.
revision = '0ed5def089e2'
down_revision = 'c7467b642498'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
BUILD_EVENT_TABLE = "zuul_build_event"
BUILD_TABLE = "zuul_build"
def upgrade(table_prefix=''):
op.create_table(
table_prefix + BUILD_EVENT_TABLE,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("build_id", sa.Integer,
sa.ForeignKey(table_prefix + BUILD_TABLE + ".id")),
sa.Column("event_time", sa.DateTime),
sa.Column("event_type", sa.String(255)),
sa.Column("description", sa.TEXT()),
)
def downgrade():
raise Exception("Downgrades not supported")

View File

@ -29,6 +29,7 @@ from zuul.zk.locks import CONNECTION_LOCK_ROOT, locked, SessionAwareLock
BUILDSET_TABLE = 'zuul_buildset'
BUILD_TABLE = 'zuul_build'
BUILD_EVENTS_TABLE = 'zuul_build_event'
ARTIFACT_TABLE = 'zuul_artifact'
PROVIDES_TABLE = 'zuul_provides'
@ -446,6 +447,15 @@ class SQLConnection(BaseConnection):
session.flush()
return p
def createBuildEvent(self, *args, **kw):
session = orm.session.Session.object_session(self)
e = BuildEventModel(*args, **kw)
e.build_id = self.id
self.build_events.append(e)
session.add(e)
session.flush()
return e
class ArtifactModel(Base):
__tablename__ = self.table_prefix + ARTIFACT_TABLE
id = sa.Column(sa.Integer, primary_key=True)
@ -464,6 +474,19 @@ class SQLConnection(BaseConnection):
name = sa.Column(sa.String(255))
build = orm.relationship(BuildModel, backref="provides")
class BuildEventModel(Base):
__tablename__ = self.table_prefix + BUILD_EVENTS_TABLE
id = sa.Column(sa.Integer, primary_key=True)
build_id = sa.Column(sa.Integer, sa.ForeignKey(
self.table_prefix + BUILD_TABLE + ".id"))
event_time = sa.Column(sa.DateTime)
event_type = sa.Column(sa.String(255))
description = sa.Column(sa.TEXT())
build = orm.relationship(BuildModel, backref="build_events")
self.buildEventModel = BuildEventModel
self.zuul_build_event_table = self.buildEventModel.__table__
self.providesModel = ProvidesModel
self.zuul_provides_table = self.providesModel.__table__

View File

@ -163,6 +163,17 @@ class SQLReporter(BaseReporter):
artifact['metadata'] = json.dumps(
artifact['metadata'])
db_build.createArtifact(**artifact)
for event in build.events:
# Reformat the event_time so it's compatible to SQL.
# Don't update the event object in place, but only
# the generated dict representation to not alter the
# datastructure for other reporters.
ev = event.toDict()
ev["event_time"] = datetime.datetime.fromtimestamp(
event.event_time, tz=datetime.timezone.utc)
db_build.createBuildEvent(**ev)
return db_build
except sqlalchemy.exc.DBAPIError:
if retry_count < self.retry_count - 1:

View File

@ -1772,9 +1772,12 @@ class PipelineManager(metaclass=ABCMeta):
if all_completed:
self.sched.executor.resumeBuild(build)
build.updateAttributes(
build_set.item.pipeline.manager.current_context,
paused=False)
with build.activeContext(self.current_context):
build.paused = False
build.addEvent(
model.BuildEvent(
event_time=time.time(),
event_type=model.BuildEvent.TYPE_RESUMED))
def _resetDependentBuilds(self, build_set, build):
job_graph = build_set.job_graph

View File

@ -3725,6 +3725,27 @@ class BuildReference:
self._path = _path
class BuildEvent:
TYPE_PAUSED = "paused"
TYPE_RESUMED = "resumed"
def __init__(self, event_time, event_type, description=None):
self.event_time = event_time
self.event_type = event_type
self.description = description
def toDict(self):
return {
"event_time": self.event_time,
"event_type": self.event_type,
"description": self.description,
}
@classmethod
def fromDict(cls, data):
return cls(data["event_time"], data["event_type"], data["description"])
class Build(zkobject.ZKObject):
"""A Build is an instance of a single execution of a Job.
@ -3765,6 +3786,8 @@ class Build(zkobject.ZKObject):
zuul_event_id=None,
build_request_ref=None,
span_info=None,
# A list of build events like paused, resume, ...
events=[],
)
def serialize(self, context):
@ -3784,6 +3807,7 @@ class Build(zkobject.ZKObject):
"zuul_event_id": self.zuul_event_id,
"build_request_ref": self.build_request_ref,
"span_info": self.span_info,
"events": [e.toDict() for e in self.events],
}
if COMPONENT_REGISTRY.model_api < 5:
data["_result_data"] = (self._result_data.getPath()
@ -3806,6 +3830,11 @@ class Build(zkobject.ZKObject):
def deserialize(self, raw, context):
data = super().deserialize(raw, context)
# Deserialize build events
data["events"] = [
BuildEvent.fromDict(e) for e in data.get("events", [])
]
# Result data can change (between a pause and build
# completion).
@ -3880,6 +3909,12 @@ class Build(zkobject.ZKObject):
data=secret_result_data,
_path=self.getPath() + '/secret_result_data')
def addEvent(self, event):
if not self._active_context:
raise Exception(
"addEvent must be used with a context manager")
self.events.append(event)
@property
def failed(self):
if self.result and self.result not in ['SUCCESS', 'SKIPPED']:

View File

@ -53,6 +53,7 @@ from zuul.model import (
Abide,
Build,
BuildCompletedEvent,
BuildEvent,
BuildPausedEvent,
BuildStartedEvent,
BuildStatusEvent,
@ -2694,6 +2695,10 @@ class Scheduler(threading.Thread):
# with child job skipping.
with build.activeContext(pipeline.manager.current_context):
build.paused = True
build.addEvent(
BuildEvent(
event_time=time.time(),
event_type=BuildEvent.TYPE_PAUSED))
build.setResultData(
event.data.get("data", {}),
event.data.get("secret_data", {}))