Merge "Store pause and resume events on the build and report them"
This commit is contained in:
commit
cb40ddc7db
5
releasenotes/notes/paused-events-4adaade5e29fc10e.yaml
Normal file
5
releasenotes/notes/paused-events-4adaade5e29fc10e.yaml
Normal file
@ -0,0 +1,5 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Zuul now stores the pause and resume events for a build together with
|
||||
their timestamp and reports them to the SQL database and via MQTT.
|
@ -130,11 +130,12 @@ class TestSQLConnectionMysql(ZuulTestCase):
|
||||
sa.sql.select([reporter.connection.zuul_buildset_table]))
|
||||
|
||||
buildsets = result.fetchall()
|
||||
self.assertEqual(4, len(buildsets))
|
||||
self.assertEqual(5, len(buildsets))
|
||||
buildset0 = buildsets[0]
|
||||
buildset1 = buildsets[1]
|
||||
buildset2 = buildsets[2]
|
||||
buildset3 = buildsets[3]
|
||||
buildset4 = buildsets[4]
|
||||
|
||||
self.assertEqual('check', buildset0['pipeline'])
|
||||
self.assertEqual('org/project', buildset0['project'])
|
||||
@ -221,6 +222,40 @@ class TestSQLConnectionMysql(ZuulTestCase):
|
||||
buildset3_builds[1]['end_time'],
|
||||
buildset3_builds[1]['start_time'])
|
||||
|
||||
# Check the paused build result
|
||||
buildset4_builds = conn.execute(
|
||||
sa.sql.select([
|
||||
reporter.connection.zuul_build_table
|
||||
]).where(
|
||||
reporter.connection.zuul_build_table.c.buildset_id ==
|
||||
buildset4['id']
|
||||
).order_by(reporter.connection.zuul_build_table.c.id)
|
||||
).fetchall()
|
||||
|
||||
paused_build_events = conn.execute(
|
||||
sa.sql.select([
|
||||
reporter.connection.zuul_build_event_table
|
||||
]).where(
|
||||
reporter.connection.zuul_build_event_table.c.build_id
|
||||
== buildset4_builds[0]["id"]
|
||||
)
|
||||
).fetchall()
|
||||
|
||||
self.assertEqual(len(paused_build_events), 2)
|
||||
pause_event = paused_build_events[0]
|
||||
resume_event = paused_build_events[1]
|
||||
self.assertEqual(
|
||||
pause_event["event_type"], "paused")
|
||||
self.assertIsNotNone(pause_event["event_time"])
|
||||
self.assertIsNone(pause_event["description"])
|
||||
self.assertEqual(
|
||||
resume_event["event_type"], "resumed")
|
||||
self.assertIsNotNone(resume_event["event_time"])
|
||||
self.assertIsNone(resume_event["description"])
|
||||
|
||||
self.assertGreater(
|
||||
resume_event["event_time"], pause_event["event_time"])
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
# Add a success result
|
||||
@ -264,6 +299,23 @@ class TestSQLConnectionMysql(ZuulTestCase):
|
||||
self.orderedRelease()
|
||||
self.waitUntilSettled()
|
||||
|
||||
# We are pausing a job within this test, so holding the jobs in
|
||||
# build and releasing them in order becomes difficult as the
|
||||
# paused job will either be paused or waiting on the child jobs
|
||||
# to start.
|
||||
# As we are not interested in the order the jobs are running but
|
||||
# only on the results in the database, simply deactivate
|
||||
# hold_jobs_in_build.
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
|
||||
# Add a paused build result
|
||||
self.log.debug("Adding paused build result")
|
||||
D = self.fake_gerrit.addFakeChange("org/project", "master", "D")
|
||||
self.executor_server.returnData(
|
||||
"project-merge", D, {"zuul": {"pause": True}})
|
||||
self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
check_results()
|
||||
|
||||
def test_sql_results_retry_builds(self):
|
||||
@ -743,6 +795,37 @@ class TestMQTTConnection(ZuulTestCase):
|
||||
self.assertIn('uuid', mqtt_payload)
|
||||
self.assertEquals(dependent_test_job['dependencies'], ['test'])
|
||||
|
||||
def test_mqtt_paused_job(self):
|
||||
|
||||
A = self.fake_gerrit.addFakeChange("org/project", "master", "A")
|
||||
# Let the job being paused via the executor
|
||||
self.executor_server.returnData("test", A, {"zuul": {"pause": True}})
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
success_event = self.mqtt_messages.pop()
|
||||
|
||||
mqtt_payload = success_event["msg"]
|
||||
self.assertEquals(mqtt_payload["project"], "org/project")
|
||||
builds = mqtt_payload["buildset"]["builds"]
|
||||
paused_job = [b for b in builds if b["job_name"] == "test"][0]
|
||||
self.assertEquals(len(paused_job["events"]), 2)
|
||||
|
||||
pause_event = paused_job["events"][0]
|
||||
self.assertEquals(pause_event["event_type"], "paused")
|
||||
self.assertGreater(
|
||||
pause_event["event_time"], paused_job["start_time"])
|
||||
self.assertLess(pause_event["event_time"], paused_job["end_time"])
|
||||
|
||||
resume_event = paused_job["events"][1]
|
||||
self.assertEquals(resume_event["event_type"], "resumed")
|
||||
self.assertGreater(
|
||||
resume_event["event_time"], paused_job["start_time"])
|
||||
self.assertLess(resume_event["event_time"], paused_job["end_time"])
|
||||
|
||||
self.assertGreater(
|
||||
resume_event["event_time"], pause_event["event_time"])
|
||||
|
||||
def test_mqtt_invalid_topic(self):
|
||||
in_repo_conf = textwrap.dedent(
|
||||
"""
|
||||
|
@ -79,7 +79,8 @@ class MQTTReporter(BaseReporter):
|
||||
'result': result,
|
||||
'dependencies': [j.name for j in job.dependencies],
|
||||
'artifacts': get_artifacts_from_result_data(
|
||||
build.result_data, logger=log)
|
||||
build.result_data, logger=log),
|
||||
'events': [e.toDict() for e in build.events],
|
||||
})
|
||||
if include_returned_data:
|
||||
rdata = build.result_data.copy()
|
||||
|
@ -0,0 +1,49 @@
|
||||
# Copyright 2022 BMW Group
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""add_build_event_table
|
||||
|
||||
Revision ID: 0ed5def089e2
|
||||
Revises: c7467b642498
|
||||
Create Date: 2022-12-12 12:08:20.882790
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0ed5def089e2'
|
||||
down_revision = 'c7467b642498'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
BUILD_EVENT_TABLE = "zuul_build_event"
|
||||
BUILD_TABLE = "zuul_build"
|
||||
|
||||
|
||||
def upgrade(table_prefix=''):
|
||||
op.create_table(
|
||||
table_prefix + BUILD_EVENT_TABLE,
|
||||
sa.Column("id", sa.Integer, primary_key=True),
|
||||
sa.Column("build_id", sa.Integer,
|
||||
sa.ForeignKey(table_prefix + BUILD_TABLE + ".id")),
|
||||
sa.Column("event_time", sa.DateTime),
|
||||
sa.Column("event_type", sa.String(255)),
|
||||
sa.Column("description", sa.TEXT()),
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
raise Exception("Downgrades not supported")
|
@ -29,6 +29,7 @@ from zuul.zk.locks import CONNECTION_LOCK_ROOT, locked, SessionAwareLock
|
||||
|
||||
BUILDSET_TABLE = 'zuul_buildset'
|
||||
BUILD_TABLE = 'zuul_build'
|
||||
BUILD_EVENTS_TABLE = 'zuul_build_event'
|
||||
ARTIFACT_TABLE = 'zuul_artifact'
|
||||
PROVIDES_TABLE = 'zuul_provides'
|
||||
|
||||
@ -446,6 +447,15 @@ class SQLConnection(BaseConnection):
|
||||
session.flush()
|
||||
return p
|
||||
|
||||
def createBuildEvent(self, *args, **kw):
|
||||
session = orm.session.Session.object_session(self)
|
||||
e = BuildEventModel(*args, **kw)
|
||||
e.build_id = self.id
|
||||
self.build_events.append(e)
|
||||
session.add(e)
|
||||
session.flush()
|
||||
return e
|
||||
|
||||
class ArtifactModel(Base):
|
||||
__tablename__ = self.table_prefix + ARTIFACT_TABLE
|
||||
id = sa.Column(sa.Integer, primary_key=True)
|
||||
@ -464,6 +474,19 @@ class SQLConnection(BaseConnection):
|
||||
name = sa.Column(sa.String(255))
|
||||
build = orm.relationship(BuildModel, backref="provides")
|
||||
|
||||
class BuildEventModel(Base):
|
||||
__tablename__ = self.table_prefix + BUILD_EVENTS_TABLE
|
||||
id = sa.Column(sa.Integer, primary_key=True)
|
||||
build_id = sa.Column(sa.Integer, sa.ForeignKey(
|
||||
self.table_prefix + BUILD_TABLE + ".id"))
|
||||
event_time = sa.Column(sa.DateTime)
|
||||
event_type = sa.Column(sa.String(255))
|
||||
description = sa.Column(sa.TEXT())
|
||||
build = orm.relationship(BuildModel, backref="build_events")
|
||||
|
||||
self.buildEventModel = BuildEventModel
|
||||
self.zuul_build_event_table = self.buildEventModel.__table__
|
||||
|
||||
self.providesModel = ProvidesModel
|
||||
self.zuul_provides_table = self.providesModel.__table__
|
||||
|
||||
|
@ -163,6 +163,17 @@ class SQLReporter(BaseReporter):
|
||||
artifact['metadata'] = json.dumps(
|
||||
artifact['metadata'])
|
||||
db_build.createArtifact(**artifact)
|
||||
|
||||
for event in build.events:
|
||||
# Reformat the event_time so it's compatible to SQL.
|
||||
# Don't update the event object in place, but only
|
||||
# the generated dict representation to not alter the
|
||||
# datastructure for other reporters.
|
||||
ev = event.toDict()
|
||||
ev["event_time"] = datetime.datetime.fromtimestamp(
|
||||
event.event_time, tz=datetime.timezone.utc)
|
||||
db_build.createBuildEvent(**ev)
|
||||
|
||||
return db_build
|
||||
except sqlalchemy.exc.DBAPIError:
|
||||
if retry_count < self.retry_count - 1:
|
||||
|
@ -1759,9 +1759,12 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
|
||||
if all_completed:
|
||||
self.sched.executor.resumeBuild(build)
|
||||
build.updateAttributes(
|
||||
build_set.item.pipeline.manager.current_context,
|
||||
paused=False)
|
||||
with build.activeContext(self.current_context):
|
||||
build.paused = False
|
||||
build.addEvent(
|
||||
model.BuildEvent(
|
||||
event_time=time.time(),
|
||||
event_type=model.BuildEvent.TYPE_RESUMED))
|
||||
|
||||
def _resetDependentBuilds(self, build_set, build):
|
||||
job_graph = build_set.job_graph
|
||||
|
@ -3723,6 +3723,27 @@ class BuildReference:
|
||||
self._path = _path
|
||||
|
||||
|
||||
class BuildEvent:
|
||||
TYPE_PAUSED = "paused"
|
||||
TYPE_RESUMED = "resumed"
|
||||
|
||||
def __init__(self, event_time, event_type, description=None):
|
||||
self.event_time = event_time
|
||||
self.event_type = event_type
|
||||
self.description = description
|
||||
|
||||
def toDict(self):
|
||||
return {
|
||||
"event_time": self.event_time,
|
||||
"event_type": self.event_type,
|
||||
"description": self.description,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
return cls(data["event_time"], data["event_type"], data["description"])
|
||||
|
||||
|
||||
class Build(zkobject.ZKObject):
|
||||
"""A Build is an instance of a single execution of a Job.
|
||||
|
||||
@ -3763,6 +3784,8 @@ class Build(zkobject.ZKObject):
|
||||
zuul_event_id=None,
|
||||
build_request_ref=None,
|
||||
span_info=None,
|
||||
# A list of build events like paused, resume, ...
|
||||
events=[],
|
||||
)
|
||||
|
||||
def serialize(self, context):
|
||||
@ -3782,6 +3805,7 @@ class Build(zkobject.ZKObject):
|
||||
"zuul_event_id": self.zuul_event_id,
|
||||
"build_request_ref": self.build_request_ref,
|
||||
"span_info": self.span_info,
|
||||
"events": [e.toDict() for e in self.events],
|
||||
}
|
||||
if COMPONENT_REGISTRY.model_api < 5:
|
||||
data["_result_data"] = (self._result_data.getPath()
|
||||
@ -3804,6 +3828,11 @@ class Build(zkobject.ZKObject):
|
||||
def deserialize(self, raw, context):
|
||||
data = super().deserialize(raw, context)
|
||||
|
||||
# Deserialize build events
|
||||
data["events"] = [
|
||||
BuildEvent.fromDict(e) for e in data.get("events", [])
|
||||
]
|
||||
|
||||
# Result data can change (between a pause and build
|
||||
# completion).
|
||||
|
||||
@ -3878,6 +3907,12 @@ class Build(zkobject.ZKObject):
|
||||
data=secret_result_data,
|
||||
_path=self.getPath() + '/secret_result_data')
|
||||
|
||||
def addEvent(self, event):
|
||||
if not self._active_context:
|
||||
raise Exception(
|
||||
"addEvent must be used with a context manager")
|
||||
self.events.append(event)
|
||||
|
||||
@property
|
||||
def failed(self):
|
||||
if self.result and self.result not in ['SUCCESS', 'SKIPPED']:
|
||||
|
@ -53,6 +53,7 @@ from zuul.model import (
|
||||
Abide,
|
||||
Build,
|
||||
BuildCompletedEvent,
|
||||
BuildEvent,
|
||||
BuildPausedEvent,
|
||||
BuildStartedEvent,
|
||||
BuildStatusEvent,
|
||||
@ -2699,6 +2700,10 @@ class Scheduler(threading.Thread):
|
||||
# with child job skipping.
|
||||
with build.activeContext(pipeline.manager.current_context):
|
||||
build.paused = True
|
||||
build.addEvent(
|
||||
BuildEvent(
|
||||
event_time=time.time(),
|
||||
event_type=BuildEvent.TYPE_PAUSED))
|
||||
build.setResultData(
|
||||
event.data.get("data", {}),
|
||||
event.data.get("secret_data", {}))
|
||||
|
Loading…
x
Reference in New Issue
Block a user