Store BuildSet.dependent_changes as change refs

When we set the configuration of a buildset, we collect all of
the dependent changes and serialize them so the relevant change
inforamtion is encoded in the buildset object.

We do this because changes at the head of the pipeline might
merge while we are processing and we still want to include them.

The change.toDict method includes the change's commit message.
In the case of a very large commit message, or a large number of
dependent changes with moderately sized commit messages, we can
exceed the 1MB zk limit.

To address this, store the dpendent changes list as a list of
cache key refs and then resolve those changes at the time that we
launch a job.

Change-Id: I3de7f11d6cd5d66eb383f4b962613a1d54365660
This commit is contained in:
James E. Blair 2024-05-21 08:47:16 -07:00
parent dddcdeb850
commit e522c47aa4
5 changed files with 50 additions and 3 deletions

View File

@ -210,3 +210,9 @@ Version 28
:Prior Zuul version: 10.1.0
:Description: Store repo state in blobstore.
Affects schedulers and executor.
Version 29
----------
:Prior Zuul version: 10.1.0
:Description: Store BuildSet.dependent_changes as change refs.
Affects schedulers.

View File

@ -194,6 +194,24 @@ class TestGithubModelUpgrade(ZuulTestCase):
dict(name='integration', result='SUCCESS'),
], ordered=False)
@model_version(28)
@simple_layout('layouts/simple.yaml')
def test_model_28(self):
# This excercises the old side of the buildset
# dependent_changes upgrade. We don't need to perform an
# upgrade in this test since the only behavior switch is on
# the write side. The read side will be tested in the new
# case by the standard test suite, and in the old case by this
# test.
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
B.setDependsOn(A, 1)
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='check-job', result='SUCCESS'),
], ordered=False)
class TestBranchCacheUpgrade(BaseTestCase):
def setUp(self):

View File

@ -1118,6 +1118,24 @@ class PipelineManager(metaclass=ABCMeta):
log = get_annotated_logger(self.log, item.event)
log.debug("Executing jobs for %s", item)
build_set = item.current_build_set
# dependent_changes is either fully populated (old) or a list of
# change refs we need to convert into the change dict)
if (build_set.dependent_changes and
'change_message' in build_set.dependent_changes[0]):
# MODEL_API < 29
dependent_changes = build_set.dependent_changes
else:
resolved_changes = self.resolveChangeReferences(
[c['ref'] for c in build_set.dependent_changes])
dependent_changes = []
for resolved_change, orig_dict in zip(resolved_changes,
build_set.dependent_changes):
change_dict = resolved_change.toDict()
if 'bundle_id' in orig_dict:
change_dict['bundle_id'] = orig_dict['bundle_id']
dependent_changes.append(change_dict)
for job in jobs:
log.debug("Found job %s for %s", job, item)
try:
@ -1125,7 +1143,7 @@ class PipelineManager(metaclass=ABCMeta):
nodes = build_set.getJobNodeList(job)
self.sched.executor.execute(
job, nodes, item, self.pipeline, zone,
build_set.dependent_changes,
dependent_changes,
build_set.merger_items)
job.setWaitingStatus('executor')
except Exception:

View File

@ -4727,9 +4727,14 @@ class BuildSet(zkobject.ZKObject):
self.configured = True
def _toChangeDict(self, item, change):
if COMPONENT_REGISTRY.model_api < 29:
change_dict = change.toDict()
else:
change_dict = dict(
ref=change.cache_key,
)
# Inject bundle_id to dict if available, this can be used to decide
# if changes belongs to the same bunbdle
change_dict = change.toDict()
if len(item.changes) > 1:
change_dict['bundle_id'] = item.uuid
return change_dict

View File

@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
MODEL_API = 28
MODEL_API = 29